Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
89752: jobs/cdc: add metrics for paused jobs  r=miretskiy a=jayshrivastava

 This change adds new metrics to count paused jobs for every job type. For
  example, the metric for paused changefeed jobs is
  `jobs.changefeed.currently_paused`. These metrics are counted at an
  interval defined by the cluster setting `jobs.metrics.interval.poll`.
  
  This is implemented by a job which periodically queries `system.jobs`
  to count the number of paused jobs. This job is of the newly added type
  `jobspb.TypePollJobsStats`. When a node starts it's job registry, it will
  create an adoptable stats polling job if it does not exist already using a
  transaction.
  
  This change adds a test which pauses and resumes changefeeds while asserting
  the value of the `jobs.changefeed.currently_paused` metric. It also adds a
  logictest to ensure one instance of the stats polling job is created in a
  cluster.
  
  Resolves: #85467
  
  Release note (general change): This change adds new metrics to count
  paused jobs for every job type. For example, the metric for paused
  changefeed jobs is `jobs.changefeed.currently_paused`. These metrics
  are updated at an interval defined by the cluster setting
  `jobs.metrics.interval.poll`, which is defauled to 10 seconds.
  
  Epic: None


94633: kvserver: document reproposals r=nvanbenschoten a=tbg

Reproposals are a deep rabbit hole and an area in which past changes
were all related to subtle bugs. Write it all up and in particular make
some simplifications that ought to be possible if my understanding is
correct:

- have proposals always enter `(*Replica).propose` without a
  MaxLeaseIndex or prior encoded command set, i.e. `propose`
  behaves the same for reproposals as for first proposals.
- assert that after a failed call to tryReproposeWithNewLeaseIndex,
  the command is not in the proposals map, i.e. check absence of
  a leak.
- replace code that should be impossible to reach (and had me confused
  for a good amount of time) with an assertion.
- add long comment on `r.mu.proposals`.

This commit also moves `tryReproposeWithNewLeaseIndex` off `(*Replica)`,
which is possible due to recent changes[^1]. In doing so, I realized
there was a (small) data race (now fixed): when returning a
`NotLeaseholderError` from that method, we weren't acquiring `r.mu`. It
may have looked as though we were holding it already since we're
accessing `r.mu.propBuf`, however that field has special semantics - it
wraps `r.mu` and acquires it when needed.

[^1]: The "below raft" test mentioned in the previous comment was
changed in #93785 and
no longer causes a false positive.

Epic: CRDB-220
Release note: None


96650: kvserver: extract kvstorage.DestroyReplica r=pavelkalinnikov a=tbg

This series of commits extracts `(*Replica).preDestroyRaftMuLocked` into a
free-standing method `kvstorage.DestroyReplica`.

In doing so, it separates the in-memory and on-disk steps that are a part
of replica removal, and makes the on-disk steps unit testable.

Touches #93241.

Epic: CRDB-220
Release note: None


96659: sql: wrap stacktraceless errors with errors.Wrap r=andreimatei a=ecwall

Fixes #95794

This replaces the previous attempt to add logging here #95797.

The context itself cannot be augmented to add a stack trace to errors because
it interferes with grpc timeout logic - gRPC compares errors directly without
checking causes https://github.com/grpc/grpc-go/blob/v1.46.0/rpc_util.go#L833.
Although the method signature allows it, `Context.Err()` should not be
overriden to customize the error:
```
// If Done is not yet closed, Err returns nil.
// If Done is closed, Err returns a non-nil error explaining why:
// Canceled if the context was canceled
// or DeadlineExceeded if the context's deadline passed.
// After Err returns a non-nil error, successive calls to Err return the same error.
Err() error
```
Additionally, a child context of the augmented context may end up being used
which will circumvent the stack trace capture.

This change instead wraps `errors.Wrap` in a few places that might end up
helping debug the original problem:
1) Where we call `Context.Err()` directly.
2) Where gRPC returns an error after possibly calling `Context.Err()`
   internally or returns an error that does not have a stack trace.

Release note: None

96770: storage: don't modify the given cfg.Opts r=RaduBerinde a=RaduBerinde

This change improves the `NewPebble` code to not modify the given `cfg.Opts`. Such behavior is surprising and can trip up tests that reuse the same config.

Also, `ResolveEncryptedEnvOptions` and `wrapFilesystemMiddleware` no longer modify the `Options` directly; and `CheckNoRegistryFile` is now a standalone function.

Release note: None
Epic: none

96793: kvserver: de-flake TestReplicaProbeRequest r=pavelkalinnikov a=tbg

Chanced upon this failure mode in unrelated PR #96781.

Epic: none
Release note: None


Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
5 people committed Feb 8, 2023
7 parents dbfc71b + 690da3e + 866d58a + 6b7de86 + 00fd3ef + f314232 + daf2dad commit 844a370
Show file tree
Hide file tree
Showing 60 changed files with 1,206 additions and 426 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-36 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-38 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-36</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-38</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ ALL_TESTS = [
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1124,6 +1125,7 @@ GO_TARGETS = [
"//pkg/jobs/jobsauth:jobsauth",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
Expand Down
17 changes: 8 additions & 9 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (c *Connector) RangeLookup(
}
return resp.Descriptors, resp.PrefetchedDescriptors, nil
}
return nil, nil, ctx.Err()
return nil, nil, errors.Wrap(ctx.Err(), "range lookup")
}

// NodesUI implements the serverpb.TenantStatusServer interface
Expand Down Expand Up @@ -544,7 +544,7 @@ func (c *Connector) NewIterator(
rangeDescriptors = append(rangeDescriptors, e.RangeDescriptors...)
}
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "new iterator")
}

// TokenBucket implements the kvtenant.TokenBucketProvider interface.
Expand Down Expand Up @@ -575,7 +575,7 @@ func (c *Connector) TokenBucket(
}
return resp, nil
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "token bucket")
}

// GetSpanConfigRecords implements the spanconfig.KVAccessor interface.
Expand All @@ -587,7 +587,7 @@ func (c *Connector) GetSpanConfigRecords(
Targets: spanconfig.TargetsToProtos(targets),
})
if err != nil {
return err
return errors.Wrap(err, "get span configs error")
}

records, err = spanconfig.EntriesToRecords(resp.SpanConfigEntries)
Expand Down Expand Up @@ -617,7 +617,7 @@ func (c *Connector) UpdateSpanConfigRecords(
MaxCommitTimestamp: maxCommitTS,
})
if err != nil {
return err
return errors.Wrap(err, "update span configs error")
}
if resp.Error.IsSet() {
// Logical error; propagate as such.
Expand Down Expand Up @@ -655,13 +655,12 @@ func (c *Connector) GetAllSystemSpanConfigsThatApply(
) ([]roachpb.SpanConfig, error) {
var spanConfigs []roachpb.SpanConfig
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
var err error
resp, err := c.GetAllSystemSpanConfigsThatApply(
ctx, &roachpb.GetAllSystemSpanConfigsThatApplyRequest{
TenantID: id,
})
if err != nil {
return err
return errors.Wrap(err, "get all system span configs that apply error")
}

spanConfigs = resp.SpanConfigs
Expand Down Expand Up @@ -713,7 +712,7 @@ func (c *Connector) withClient(
}
return f(ctx, c)
}
return ctx.Err()
return errors.Wrap(ctx.Err(), "with client")
}

// getClient returns the singleton InternalClient if one is currently active. If
Expand Down Expand Up @@ -778,7 +777,7 @@ func (c *Connector) dialAddrs(ctx context.Context) (*client, error) {
}, nil
}
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "dial addrs")
}

func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) {
Expand Down
15 changes: 6 additions & 9 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,17 +1532,14 @@ func pebbleCryptoInitializer() error {
}
}

cfg := storage.PebbleConfig{
StorageConfig: storageConfig,
Opts: storage.DefaultPebbleOptions(),
}

// This has the side effect of storing the encrypted FS into cfg.Opts.FS.
_, _, err := storage.ResolveEncryptedEnvOptions(&cfg)
_, encryptedEnv, err := storage.ResolveEncryptedEnvOptions(&storageConfig, vfs.Default, false /* readOnly */)
if err != nil {
return err
}

pebbleToolFS.set(cfg.Opts.FS)
if encryptedEnv != nil {
pebbleToolFS.set(encryptedEnv.FS)
} else {
pebbleToolFS.set(vfs.Default)
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 53 descriptors and 52 namespace entries...
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
Examining 12 jobs...
Examining 14 jobs...
ERROR: validation failed
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ const (

V23_1_DeleteDroppedFunctionDescriptors

// V23_1_CreateJobsMetricsPollingJob creates the permanent job
// responsible for polling the jobs table for metrics.
V23_1_CreateJobsMetricsPollingJob

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -700,6 +704,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_DeleteDroppedFunctionDescriptors,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 36},
},
{
Key: V23_1_CreateJobsMetricsPollingJob,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 38},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
15 changes: 15 additions & 0 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
executionErrorsMaxEntriesKey = "jobs.execution_errors.max_entries"
executionErrorsMaxEntrySizeKey = "jobs.execution_errors.max_entry_size"
debugPausePointsSettingKey = "jobs.debug.pausepoints"
metricsPollingIntervalKey = "jobs.metrics.interval.poll"
)

const (
Expand Down Expand Up @@ -70,6 +71,10 @@ const (
// error. If this size is exceeded, the error will be formatted as a string
// and then truncated to fit the size.
defaultExecutionErrorsMaxEntrySize = 64 << 10 // 64 KiB

// defaultPollForMetricsInterval is the default interval to poll the jobs
// table for metrics.
defaultPollForMetricsInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -100,6 +105,16 @@ var (
settings.PositiveDuration,
)

// PollJobsMetricsInterval is the interval at which a tenant in the cluster
// will poll the jobs table for metrics
PollJobsMetricsInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
metricsPollingIntervalKey,
"the interval at which a node in the cluster will poll the jobs table for metrics",
defaultPollForMetricsInterval,
settings.PositiveDuration,
)

gcIntervalSetting = settings.RegisterDurationSetting(
settings.TenantWritable,
gcIntervalSettingKey,
Expand Down
104 changes: 103 additions & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -229,7 +230,8 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
ManagerDisableJobCreation: true,
}
args.Knobs.UpgradeManager = &upgradebase.TestingKnobs{
DontUseJobs: true,
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
}
args.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true}

Expand Down Expand Up @@ -3456,3 +3458,103 @@ func TestPausepoints(t *testing.T) {
})
}
}

func TestPausedMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer s.Stopper().Stop(ctx)

jobs.PollJobsMetricsInterval.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond)
runner := sqlutils.MakeSQLRunner(sqlDB)
reg := s.JobRegistry().(*jobs.Registry)

waitForPausedCount := func(typ jobspb.Type, numPaused int64) {
testutils.SucceedsSoon(t, func() error {
currentlyPaused := reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value()
if reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() != numPaused {
return fmt.Errorf(
"expected (%+v) paused jobs of type (%+v), found (%+v)",
numPaused,
typ,
currentlyPaused,
)
}
return nil
})
}

typeToRecord := map[jobspb.Type]jobs.Record{
jobspb.TypeChangefeed: {
Details: jobspb.ChangefeedDetails{},
Progress: jobspb.ChangefeedProgress{},
Username: username.TestUserName(),
},
jobspb.TypeImport: {
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
},
jobspb.TypeSchemaChange: {
Details: jobspb.SchemaChangeDetails{},
Progress: jobspb.SchemaChangeProgress{},
Username: username.TestUserName(),
},
}
for typ := range typeToRecord {
jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
},
}
}, jobs.UsesTenantCostControl)
}

makeJob := func(ctx context.Context,
typ jobspb.Type,
) *jobs.StartableJob {
j, err := jobs.TestingCreateAndStartJob(ctx, reg, s.InternalDB().(isql.DB), typeToRecord[typ])
if err != nil {
t.Fatal(err)
}
return j
}

cfJob := makeJob(context.Background(), jobspb.TypeChangefeed)
cfJob2 := makeJob(context.Background(), jobspb.TypeChangefeed)
importJob := makeJob(context.Background(), jobspb.TypeImport)
scJob := makeJob(context.Background(), jobspb.TypeSchemaChange)

// Pause all job types.
runner.Exec(t, "PAUSE JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
runner.Exec(t, "PAUSE JOB $1", cfJob2.ID())
waitForPausedCount(jobspb.TypeChangefeed, 2)
runner.Exec(t, "PAUSE JOB $1", importJob.ID())
waitForPausedCount(jobspb.TypeImport, 1)
runner.Exec(t, "PAUSE JOB $1", scJob.ID())
waitForPausedCount(jobspb.TypeSchemaChange, 1)

// Resume / cancel jobs.
runner.Exec(t, "RESUME JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
runner.Exec(t, "CANCEL JOB $1", cfJob2.ID())
waitForPausedCount(jobspb.TypeChangefeed, 0)
runner.Exec(t, "RESUME JOB $1", importJob.ID())
waitForPausedCount(jobspb.TypeImport, 0)
runner.Exec(t, "CANCEL JOB $1", scJob.ID())
waitForPausedCount(jobspb.TypeSchemaChange, 0)

runner.Exec(t, "CANCEL JOB $1", cfJob.ID())
runner.Exec(t, "CANCEL JOB $1", importJob.ID())
}
12 changes: 11 additions & 1 deletion pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "jobspb",
Expand Down Expand Up @@ -64,4 +64,14 @@ go_proto_library(
],
)

go_test(
name = "jobspb_test",
srcs = ["wrap_test.go"],
args = ["-test.timeout=295s"],
deps = [
":jobspb",
"@com_github_stretchr_testify//assert",
],
)

get_x_data(name = "get_x_data")
13 changes: 13 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,12 @@ message SchemaTelemetryDetails {
message SchemaTelemetryProgress {
}

message PollJobsStatsDetails {
}

message PollJobsStatsProgress {
}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -1200,7 +1206,12 @@ message Payload {
// and publish it to the telemetry event log. These jobs are typically
// created by a built-in schedule named "sql-schema-telemetry".
SchemaTelemetryDetails schema_telemetry = 37;

KeyVisualizerDetails keyVisualizerDetails = 38;

// PollJobsStats jobs poll the jobs table for statistics metrics as the number of
// paused jobs.
PollJobsStatsDetails poll_jobs_stats = 39;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
Expand Down Expand Up @@ -1263,6 +1274,7 @@ message Progress {
RowLevelTTLProgress row_level_ttl = 25 [(gogoproto.customname)="RowLevelTTL"];
SchemaTelemetryProgress schema_telemetry = 26;
KeyVisualizerProgress keyVisualizerProgress = 27;
PollJobsStatsProgress pollJobsStats = 28;
}

uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
Expand Down Expand Up @@ -1293,6 +1305,7 @@ enum Type {
ROW_LEVEL_TTL = 16 [(gogoproto.enumvalue_customname) = "TypeRowLevelTTL"];
AUTO_SCHEMA_TELEMETRY = 17 [(gogoproto.enumvalue_customname) = "TypeAutoSchemaTelemetry"];
KEY_VISUALIZER = 18 [(gogoproto.enumvalue_customname) = "TypeKeyVisualizer"];
POLL_JOBS_STATS = 19 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"];
}

message Job {
Expand Down
Loading

0 comments on commit 844a370

Please sign in to comment.