diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 6c7c3bb35bcb..9fb60f4e210e 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -936,5 +936,6 @@ func init() { job: job, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 91fc907acdaf..a28c6672b46a 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2554,5 +2554,6 @@ func init() { settings: settings, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a819a7948e91..0ed4949405e7 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -69,6 +69,7 @@ func init() { func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return &changefeedResumer{job: job} }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel index feac5d043624..d9344d17d9b4 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel +++ b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel @@ -49,6 +49,8 @@ go_test( "//pkg/cloud", "//pkg/cloud/nodelocal", "//pkg/cloud/nullsink", + "//pkg/jobs", + "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/kv/kvclient/kvtenant", "//pkg/multitenant", @@ -63,6 +65,7 @@ go_test( "//pkg/sql/execinfra", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/stats", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index 4c307fbcbc6d..85a425ee852e 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -11,6 +11,7 @@ package tenantcostclient_test import ( "bytes" "context" + gosql "database/sql" "fmt" "io" "io/ioutil" @@ -30,6 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" "github.com/cockroachdb/cockroach/pkg/cloud/nullsink" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/multitenant" @@ -43,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -968,6 +972,97 @@ func TestSQLLivenessExemption(t *testing.T) { ) } +// TestScheduledJobsConsumption verifies that the scheduled jobs system itself +// does not consume RUs, but that the jobs it runs do consume RUs. +func TestScheduledJobsConsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + stats.AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) + tenantcostclient.TargetPeriodSetting.Override(ctx, &st.SV, time.Millisecond*20) + + hostServer, _, _ := serverutils.StartServer(t, base.TestServerArgs{Settings: st}) + defer hostServer.Stopper().Stop(ctx) + + testProvider := newTestProvider() + + env := jobstest.NewJobSchedulerTestEnv(jobstest.UseSystemTables, timeutil.Now()) + var zeroDuration time.Duration + var execSchedules func() error + var tenantServer serverutils.TestTenantInterface + var tenantDB *gosql.DB + tenantServer, tenantDB = serverutils.StartTenant(t, hostServer, base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + Settings: st, + AllowSettingClusterSettings: true, + TestingKnobs: base.TestingKnobs{ + TenantTestingKnobs: &sql.TenantTestingKnobs{ + OverrideTokenBucketProvider: func(kvtenant.TokenBucketProvider) kvtenant.TokenBucketProvider { + return testProvider + }, + }, + TTL: &sql.TTLTestingKnobs{ + // Don't wait until we can use a historical query. + AOSTDuration: &zeroDuration, + }, + JobsTestingKnobs: &jobs.TestingKnobs{ + JobSchedulerEnv: env, + TakeOverJobsScheduling: func(fn func(ctx context.Context, maxSchedules int64) error) { + execSchedules = func() error { + defer tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() + return fn(ctx, 0) + } + }, + IntervalOverrides: jobs.TestingIntervalOverrides{ + // Force fast adoption and resumption. + Adopt: &zeroDuration, + RetryInitialDelay: &zeroDuration, + RetryMaxDelay: &zeroDuration, + }, + }, + }, + }) + + r := sqlutils.MakeSQLRunner(tenantDB) + // Create a table with rows that expire after a TTL. This will trigger the + // creation of a TTL job. + r.Exec(t, "CREATE TABLE t (v INT PRIMARY KEY) WITH ("+ + "ttl_expire_after = '1 microsecond', ttl_job_cron = '* * * * ?', ttl_delete_batch_size = 1)") + r.Exec(t, "INSERT INTO t SELECT x FROM generate_series(1,100) g(x)") + before := testProvider.waitForConsumption(t) + + // Ensure the job system is not consuming RUs when scanning/claiming jobs. + tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() + env.AdvanceTime(24 * time.Hour) + time.Sleep(100 * time.Millisecond) + after := testProvider.waitForConsumption(t) + after.Sub(&before) + require.Zero(t, after.WriteBatches) + require.Zero(t, after.WriteBytes) + // Expect up to 3 batches for initial auto-stats query, schema catalog fill, + // and anything else that happens once during server startup but might not be + // done by this point. + require.LessOrEqual(t, after.ReadBatches, uint64(3)) + + // Make sure that at least 100 writes (deletes) are reported. The TTL job + // should not be exempt from cost control. + testutils.SucceedsSoon(t, func() error { + // Run all job schedules. + env.AdvanceTime(time.Minute) + require.NoError(t, execSchedules()) + + // Check consumption. + c := testProvider.waitForConsumption(t) + c.Sub(&before) + if c.WriteRequests < 100 { + return errors.New("no write requests reported") + } + return nil + }) +} + // TestConsumption verifies consumption reporting from a tenant server process. func TestConsumptionChangefeeds(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index eeebe550c20e..ca0364c6ea0d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -190,5 +190,7 @@ func init() { func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &streamIngestionResumer{job: job} - }) + }, + jobs.UsesTenantCostControl, + ) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index a54e2ef8adb3..9da9d7eebc38 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -206,5 +206,6 @@ func init() { job: job, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go index def40e447ac7..9789e5fd1ca7 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go @@ -120,5 +120,6 @@ func init() { timer: ts.NewTimer(), } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go index 89bf9dce2cd7..f0313f0b84b3 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go @@ -131,7 +131,7 @@ func TestStreamReplicationProducerJob(t *testing.T) { resumer: r, revertingFinished: waitJobFinishReverting, } - }) + }, jobs.UsesTenantCostControl) return mt, func() { in <- struct{}{} // Signals the timer that a new time is assigned to time source diff --git a/pkg/cli/debug_job_trace_test.go b/pkg/cli/debug_job_trace_test.go index 480ee4dabfd8..4ac3741b4f82 100644 --- a/pkg/cli/debug_job_trace_test.go +++ b/pkg/cli/debug_job_trace_test.go @@ -96,6 +96,7 @@ func TestDebugJobTrace(t *testing.T) { recordedSpanCh: recordedSpanCh, } }, + jobs.UsesTenantCostControl, ) // Create a "backup job" but we have overridden the resumer constructor above diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 3c195bd0d1a0..9f65f8f93152 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/base", "//pkg/jobs/jobspb", "//pkg/kv", + "//pkg/multitenant", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 9b20dee0c46b..4f6229f165b0 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -318,6 +319,12 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven } resumeCtx, cancel := r.makeCtx() + // If the job's type was registered to disable tenant cost control, then + // exclude the job's costs from tenant accounting. + if opts, ok := options[payload.Type()]; ok && opts.disableTenantCostControl { + resumeCtx = multitenant.WithTenantCostControlExemption(resumeCtx) + } + if alreadyAdopted := r.addAdoptedJob(jobID, s, cancel); alreadyAdopted { return nil } diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 56d10dc1fe0c..612d5dcfed25 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -176,7 +176,7 @@ func TestJobsControlForSchedules(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) record := Record{ Description: "fake job", @@ -289,7 +289,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) record := Record{ Description: "fake job", @@ -394,7 +394,7 @@ func TestJobControlByType(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) } for _, jobType := range allJobTypes { diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index b94abbc4a04f..820dbdbd8714 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security" @@ -547,6 +548,11 @@ func StartJobSchedulerDaemon( cfg *scheduledjobs.JobExecutionConfig, env scheduledjobs.JobSchedulerEnv, ) { + // Since the job scheduler system is not under user control, exclude it from + // from cost accounting and control. Individual jobs are not part of this + // exclusion. + ctx = multitenant.WithTenantCostControlExemption(ctx) + schedulerEnv := env var daemonKnobs *TestingKnobs if jobsKnobs, ok := cfg.TestingKnobs.(*TestingKnobs); ok { diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 44d7dbb5a73b..b0925bc51c4b 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -323,7 +323,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) { return rts.onPauseRequest(ctx, execCfg, txn, progress) }, } - }) + }, jobs.UsesTenantCostControl) } func (rts *registryTestSuite) tearDown() { @@ -1216,7 +1216,7 @@ func TestJobLifecycle(t *testing.T) { } }, } - }) + }, jobs.UsesTenantCostControl) startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) { beforeTime := timeutil.Now() @@ -1760,7 +1760,7 @@ func TestJobLifecycle(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) jobID := registry.MakeJobID() record := jobs.Record{ @@ -2212,7 +2212,7 @@ func TestShowJobWhenComplete(t *testing.T) { } }, } - }) + }, jobs.UsesTenantCostControl) type row struct { id jobspb.JobID @@ -2373,7 +2373,7 @@ func TestJobInTxn(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) // Piggy back on RESTORE to be able to create a failing test job. sql.AddPlanHook( "test", @@ -2404,7 +2404,7 @@ func TestJobInTxn(t *testing.T) { return errors.New("RESTORE failed") }, } - }) + }, jobs.UsesTenantCostControl) t.Run("rollback txn", func(t *testing.T) { start := timeutil.Now() @@ -2499,7 +2499,7 @@ func TestStartableJobMixedVersion(t *testing.T) { jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{} - }) + }, jobs.UsesTenantCostControl) var j *jobs.StartableJob jobID := jr.MakeJobID() require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { @@ -2542,7 +2542,7 @@ func TestStartableJob(t *testing.T) { return resumeFunc.Load().(func(ctx context.Context) error)(ctx) }, } - }) + }, jobs.UsesTenantCostControl) woodyP, _ := security.MakeSQLUsernameFromUserInput("Woody Pride", security.UsernameValidation) rec := jobs.Record{ Description: "There's a snake in my boot!", @@ -2737,7 +2737,7 @@ func TestStartableJobTxnRetry(t *testing.T) { jr := s.JobRegistry().(*jobs.Registry) jobs.RegisterConstructor(jobspb.TypeRestore, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{} - }) + }, jobs.UsesTenantCostControl) rec := jobs.Record{ Details: jobspb.RestoreDetails{}, Progress: jobspb.RestoreProgress{}, @@ -2785,7 +2785,7 @@ func TestUnmigratedSchemaChangeJobs(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) select { case <-resuming: t.Fatal("job was resumed") @@ -2845,7 +2845,7 @@ func TestRegistryTestingNudgeAdoptionQueue(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) before := timeutil.Now() jobID := registry.MakeJobID() _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */) @@ -2914,10 +2914,10 @@ func TestMetrics(t *testing.T) { } jobs.RegisterConstructor(jobspb.TypeBackup, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return res - }) + }, jobs.UsesTenantCostControl) jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return res - }) + }, jobs.UsesTenantCostControl) setup := func(t *testing.T) ( s serverutils.TestServerInterface, db *gosql.DB, r *jobs.Registry, cleanup func(), ) { @@ -3135,7 +3135,7 @@ func TestLoseLeaseDuringExecution(t *testing.T) { return err }, } - }) + }, jobs.UsesTenantCostControl) _, err := registry.CreateJobWithTxn(ctx, rec, registry.MakeJobID(), nil) require.NoError(t, err) @@ -3189,7 +3189,7 @@ func TestPauseReason(t *testing.T) { } }, } - }) + }, jobs.UsesTenantCostControl) rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, @@ -3406,7 +3406,7 @@ func TestPausepoints(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 43ff2bb4ed2d..4e616ff2633a 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -703,6 +704,10 @@ func (r *Registry) withSession(ctx context.Context, f withSessionFunc) { // jobs if it observes a failure. Otherwise it starts all the main daemons of // registry that poll the jobs table and start/cancel/gc jobs. func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { + // Since the job polling system is outside user control, exclude it from cost + // accounting and control. Individual jobs are not part of this exclusion. + ctx = multitenant.WithTenantCostControlExemption(ctx) + wrapWithSession := func(f withSessionFunc) func(ctx context.Context) { return func(ctx context.Context) { r.withSession(ctx, f) } } @@ -1076,6 +1081,51 @@ type Resumer interface { OnFailOrCancel(ctx context.Context, execCtx interface{}) error } +// RegisterOption is the template for options passed to the RegisterConstructor +// function. +type RegisterOption func(opts *registerOptions) + +// DisablesTenantCostControl allows job implementors to exclude their job's +// Storage I/O costs (i.e. from reads/writes) from tenant accounting, based on +// this principle: +// +// Jobs that are not triggered by user actions should be exempted from cost +// control. +// +// For example, SQL stats compaction, span reconciler, and long-running +// migration jobs are not triggered by user actions, and so should be exempted. +// However, backup jobs are triggered by user BACKUP requests and should be +// costed. Even auto stats jobs should be costed, since the user could choose to +// disable auto stats. +// +// NOTE: A cost control exemption does not exclude CPU or Egress costs from +// accounting, since those cannot be attributed to individual jobs. +var DisablesTenantCostControl = func(opts *registerOptions) { + opts.disableTenantCostControl = true + opts.hasTenantCostControlOption = true +} + +// UsesTenantCostControl indicates that resumed jobs should include their +// Storage I/O costs in tenant accounting. See DisablesTenantCostControl comment +// for more details. +var UsesTenantCostControl = func(opts *registerOptions) { + opts.disableTenantCostControl = false + opts.hasTenantCostControlOption = true +} + +// registerOptions are passed to RegisterConstructor and control how a job +// resumer is created and configured. +type registerOptions struct { + // disableTenantCostControl is true when a job's Storage I/O costs should + // be excluded from tenant accounting. See DisablesTenantCostControl comment. + disableTenantCostControl bool + + // hasTenantCostControlOption is true if either DisablesTenantCostControl or + // UsesTenantCostControl was specified as an option. RegisterConstructor will + // panic if this is false. + hasTenantCostControlOption bool +} + // PauseRequester is an extension of Resumer which allows job implementers to inject // logic during the transaction which moves a job to PauseRequested. type PauseRequester interface { @@ -1103,10 +1153,28 @@ type JobResultsReporter interface { type Constructor func(job *Job, settings *cluster.Settings) Resumer var constructors = make(map[jobspb.Type]Constructor) +var options = make(map[jobspb.Type]registerOptions) // RegisterConstructor registers a Resumer constructor for a certain job type. -func RegisterConstructor(typ jobspb.Type, fn Constructor) { +// +// NOTE: You must pass either jobs.UsesTenantCostControl or +// jobs.DisablesTenantCostControl as an option, or this method will panic; see +// comments for these options for more details on how to use them. We want +// engineers to explicitly pass one of these options so that they will be +// prompted to think about which is appropriate for their new job type. +func RegisterConstructor(typ jobspb.Type, fn Constructor, opts ...RegisterOption) { constructors[typ] = fn + + // Apply all options to the struct. + var resOpts registerOptions + for _, opt := range opts { + opt(&resOpts) + } + if !resOpts.hasTenantCostControlOption { + panic("when registering a new job type, either jobs.DisablesTenantCostControl " + + "or jobs.UsesTenantCostControl is required; see comments for these options to learn more") + } + options[typ] = resOpts } func (r *Registry) createResumer(job *Job, settings *cluster.Settings) (Resumer, error) { diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 236084cae716..6b9b0de135c6 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -374,7 +374,7 @@ func TestGCDurationControl(t *testing.T) { jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, cs *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{} - }) + }, jobs.UsesTenantCostControl) s, sqlDB, kvDB := serverutils.StartServer(t, args) defer s.Stopper().Stop(ctx) registry := s.JobRegistry().(*jobs.Registry) @@ -454,7 +454,7 @@ func TestErrorsPopulatedOnRetry(t *testing.T) { OnResume: execFn, FailOrCancel: execFn, } - }) + }, jobs.UsesTenantCostControl) s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 6869adc042da..c8e9dc9e4bb2 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -323,7 +323,7 @@ func TestBatchJobsCreation(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) // Create a batch of job specifications. var records []*Record @@ -514,7 +514,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { return <-bti.errCh }, } - }) + }, UsesTenantCostControl) return cleanup } @@ -787,7 +787,7 @@ func TestExponentialBackoffSettings(t *testing.T) { // Create and run a dummy job. RegisterConstructor(jobspb.TypeImport, func(_ *Job, cs *cluster.Settings) Resumer { return FakeResumer{} - }) + }, UsesTenantCostControl) registry := s.JobRegistry().(*Registry) id := registry.MakeJobID() require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -936,7 +936,7 @@ func TestRunWithoutLoop(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) ctx := context.Background() settings := cluster.MakeTestingClusterSettings() @@ -1007,7 +1007,7 @@ func TestJobIdleness(t *testing.T) { return <-resumeErrChan }, } - }) + }, UsesTenantCostControl) currentlyIdle := r.MetricsStruct().JobMetrics[jobspb.TypeImport].CurrentlyIdle diff --git a/pkg/migration/migrationjob/migration_job.go b/pkg/migration/migrationjob/migration_job.go index f924f27df94c..b93217625b05 100644 --- a/pkg/migration/migrationjob/migration_job.go +++ b/pkg/migration/migrationjob/migration_job.go @@ -31,9 +31,12 @@ import ( ) func init() { + // Do not include the cost of long-running migrations in tenant accounting. + // NB: While the exemption excludes the cost of Storage I/O, it is not able + // to exclude the CPU cost. jobs.RegisterConstructor(jobspb.TypeMigration, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &resumer{j: job} - }) + }, jobs.DisablesTenantCostControl) } // NewRecord constructs a new jobs.Record for this migration. diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index d209312d16ea..1db9eba1900b 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -226,5 +226,8 @@ func init() { jobs.RegisterConstructor(jobspb.TypeAutoSpanConfigReconciliation, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &resumer{job: job} - }) + }, + // Do not include the cost of span reconciliation in tenant accounting. + jobs.DisablesTenantCostControl, + ) } diff --git a/pkg/sql/compact_sql_stats.go b/pkg/sql/compact_sql_stats.go index 7a3da622ff7f..101dbaccf592 100644 --- a/pkg/sql/compact_sql_stats.go +++ b/pkg/sql/compact_sql_stats.go @@ -248,12 +248,13 @@ func (e *scheduledSQLStatsCompactionExecutor) GetCreateScheduleStatement( } func init() { + // Do not include the cost of stats compaction in tenant accounting. jobs.RegisterConstructor(jobspb.TypeAutoSQLStatsCompaction, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &sqlStatsCompactionResumer{ job: job, st: settings, } - }) + }, jobs.DisablesTenantCostControl) jobs.RegisterScheduledJobExecutorFactory( tree.ScheduledSQLStatsCompactionExecutor.InternalName(), diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 1c0e14061d4a..ca3d0646e879 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -480,6 +481,12 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics { // Start starts the Server's background processing. func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { + // Exclude SQL background processing from cost accounting and limiting. + // NOTE: Only exclude background processing that is not under user control. + // If a user can opt in/out of some aspect of background processing, then it + // should be accounted for in their costs. + ctx = multitenant.WithTenantCostControlExemption(ctx) + s.sqlStats.Start(ctx, stopper) // reportedStats is periodically cleared to prevent too many SQL Stats diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index c4bdd560cd53..0f9441cd2aa4 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -700,6 +700,6 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &createStatsResumer{job: job} } - jobs.RegisterConstructor(jobspb.TypeCreateStats, createResumerFn) - jobs.RegisterConstructor(jobspb.TypeAutoCreateStats, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeCreateStats, createResumerFn, jobs.UsesTenantCostControl) + jobs.RegisterConstructor(jobspb.TypeAutoCreateStats, createResumerFn, jobs.UsesTenantCostControl) } diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 80c590304813..7098efd62726 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -340,5 +340,5 @@ func init() { job: job, } } - jobs.RegisterConstructor(jobspb.TypeSchemaChangeGC, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeSchemaChangeGC, createResumerFn, jobs.UsesTenantCostControl) } diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 4e7548683e29..00f68df22bae 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -1721,5 +1721,6 @@ func init() { settings: settings, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 8a8f0484aa96..5004e33eb0c6 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -4946,7 +4946,7 @@ func TestImportControlJobRBAC(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) startLeasedJob := func(t *testing.T, record jobs.Record) *jobs.StartableJob { job, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), record) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index ae081e7535c3..057bb4de93f6 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2896,7 +2896,7 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &schemaChangeResumer{job: job} } - jobs.RegisterConstructor(jobspb.TypeSchemaChange, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeSchemaChange, createResumerFn, jobs.UsesTenantCostControl) } // queueCleanupJob checks if the completed schema change needs to start a diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index b25a86b4e067..4868443772ae 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -30,7 +30,7 @@ func init() { return &newSchemaChangeResumer{ job: job, } - }) + }, jobs.UsesTenantCostControl) } type newSchemaChangeResumer struct { diff --git a/pkg/sql/stmtdiagnostics/BUILD.bazel b/pkg/sql/stmtdiagnostics/BUILD.bazel index 6acb31f60083..4809a089e19c 100644 --- a/pkg/sql/stmtdiagnostics/BUILD.bazel +++ b/pkg/sql/stmtdiagnostics/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/clusterversion", "//pkg/gossip", "//pkg/kv", + "//pkg/multitenant", "//pkg/roachpb", "//pkg/security", "//pkg/settings", diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index c40a36c99346..dd66d76af441 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -131,6 +132,11 @@ func NewRegistry( // Start will start the polling loop for the Registry. func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) { ctx, _ = stopper.WithCancelOnQuiesce(ctx) + + // Since background statement diagnostics collection is not under user + // control, exclude it from cost accounting and control. + ctx = multitenant.WithTenantCostControlExemption(ctx) + // NB: The only error that should occur here would be if the server were // shutting down so let's swallow it. _ = stopper.RunAsyncTask(ctx, "stmt-diag-poll", r.poll) diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 5de92ecb0956..1c2de5ecab72 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -749,6 +749,6 @@ func init() { job: job, st: settings, } - }) + }, jobs.UsesTenantCostControl) jobs.MakeRowLevelTTLMetricsHook = makeRowLevelTTLAggMetrics } diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index e72512e9d567..71a4cf341d26 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -1348,5 +1348,5 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &typeChangeResumer{job: job} } - jobs.RegisterConstructor(jobspb.TypeTypeSchemaChange, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeTypeSchemaChange, createResumerFn, jobs.UsesTenantCostControl) }