From d7965f132608bee4e326e3cb2938c845981717bd Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Thu, 16 Jun 2022 19:25:06 -0700 Subject: [PATCH] multitenant: exclude background job reads/writes from tenant cost control Previously, reads and writes issued to the Storage layer by the job system and by background jobs were included in tenant accounting. This is not ideal, since we make a best effort to exclude operations from costing if they are not under user control. This commit excludes the activities of the job system from cost control, as well as the following jobs: - Long-running migration job - SQL stats compaction - Span config reconciler None of these jobs are triggered by user actions and none can be disabled by users. NOTE: A cost control exemption does not exclude CPU or Egress costs from accounting, since those cannot be attributed to individual jobs. Release note: None --- pkg/ccl/backupccl/backup_job.go | 1 + pkg/ccl/backupccl/restore_job.go | 1 + pkg/ccl/changefeedccl/changefeed_stmt.go | 1 + .../tenantcostclient/BUILD.bazel | 3 + .../tenantcostclient/tenant_side_test.go | 95 +++++++++++++++++++ .../streamingest/stream_ingestion_job.go | 4 +- .../streamingest/stream_ingestion_planning.go | 1 + .../streamproducer/producer_job.go | 1 + .../streamproducer/producer_job_test.go | 2 +- pkg/cli/debug_job_trace_test.go | 1 + pkg/jobs/BUILD.bazel | 1 + pkg/jobs/adopt.go | 7 ++ pkg/jobs/delegate_control_test.go | 6 +- pkg/jobs/job_scheduler.go | 6 ++ pkg/jobs/jobs_test.go | 32 +++---- pkg/jobs/registry.go | 70 +++++++++++++- pkg/jobs/registry_external_test.go | 4 +- pkg/jobs/registry_test.go | 10 +- pkg/migration/migrationjob/migration_job.go | 5 +- pkg/spanconfig/spanconfigjob/job.go | 5 +- pkg/sql/compact_sql_stats.go | 3 +- pkg/sql/conn_executor.go | 7 ++ pkg/sql/create_stats.go | 4 +- pkg/sql/gcjob/gc_job.go | 2 +- pkg/sql/importer/import_job.go | 1 + pkg/sql/importer/import_stmt_test.go | 2 +- pkg/sql/schema_changer.go | 2 +- pkg/sql/schemachanger/scjob/job.go | 2 +- pkg/sql/stmtdiagnostics/BUILD.bazel | 1 + .../stmtdiagnostics/statement_diagnostics.go | 6 ++ pkg/sql/ttl/ttljob/ttljob.go | 2 +- pkg/sql/type_change.go | 2 +- 32 files changed, 250 insertions(+), 40 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 6c7c3bb35bcb..9fb60f4e210e 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -936,5 +936,6 @@ func init() { job: job, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 91fc907acdaf..a28c6672b46a 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2554,5 +2554,6 @@ func init() { settings: settings, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a819a7948e91..0ed4949405e7 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -69,6 +69,7 @@ func init() { func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return &changefeedResumer{job: job} }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel index feac5d043624..d9344d17d9b4 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel +++ b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel @@ -49,6 +49,8 @@ go_test( "//pkg/cloud", "//pkg/cloud/nodelocal", "//pkg/cloud/nullsink", + "//pkg/jobs", + "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/kv/kvclient/kvtenant", "//pkg/multitenant", @@ -63,6 +65,7 @@ go_test( "//pkg/sql/execinfra", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slinstance", + "//pkg/sql/stats", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index 4c307fbcbc6d..85a425ee852e 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -11,6 +11,7 @@ package tenantcostclient_test import ( "bytes" "context" + gosql "database/sql" "fmt" "io" "io/ioutil" @@ -30,6 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" "github.com/cockroachdb/cockroach/pkg/cloud/nullsink" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/multitenant" @@ -43,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" + "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -968,6 +972,97 @@ func TestSQLLivenessExemption(t *testing.T) { ) } +// TestScheduledJobsConsumption verifies that the scheduled jobs system itself +// does not consume RUs, but that the jobs it runs do consume RUs. +func TestScheduledJobsConsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + stats.AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) + tenantcostclient.TargetPeriodSetting.Override(ctx, &st.SV, time.Millisecond*20) + + hostServer, _, _ := serverutils.StartServer(t, base.TestServerArgs{Settings: st}) + defer hostServer.Stopper().Stop(ctx) + + testProvider := newTestProvider() + + env := jobstest.NewJobSchedulerTestEnv(jobstest.UseSystemTables, timeutil.Now()) + var zeroDuration time.Duration + var execSchedules func() error + var tenantServer serverutils.TestTenantInterface + var tenantDB *gosql.DB + tenantServer, tenantDB = serverutils.StartTenant(t, hostServer, base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + Settings: st, + AllowSettingClusterSettings: true, + TestingKnobs: base.TestingKnobs{ + TenantTestingKnobs: &sql.TenantTestingKnobs{ + OverrideTokenBucketProvider: func(kvtenant.TokenBucketProvider) kvtenant.TokenBucketProvider { + return testProvider + }, + }, + TTL: &sql.TTLTestingKnobs{ + // Don't wait until we can use a historical query. + AOSTDuration: &zeroDuration, + }, + JobsTestingKnobs: &jobs.TestingKnobs{ + JobSchedulerEnv: env, + TakeOverJobsScheduling: func(fn func(ctx context.Context, maxSchedules int64) error) { + execSchedules = func() error { + defer tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() + return fn(ctx, 0) + } + }, + IntervalOverrides: jobs.TestingIntervalOverrides{ + // Force fast adoption and resumption. + Adopt: &zeroDuration, + RetryInitialDelay: &zeroDuration, + RetryMaxDelay: &zeroDuration, + }, + }, + }, + }) + + r := sqlutils.MakeSQLRunner(tenantDB) + // Create a table with rows that expire after a TTL. This will trigger the + // creation of a TTL job. + r.Exec(t, "CREATE TABLE t (v INT PRIMARY KEY) WITH ("+ + "ttl_expire_after = '1 microsecond', ttl_job_cron = '* * * * ?', ttl_delete_batch_size = 1)") + r.Exec(t, "INSERT INTO t SELECT x FROM generate_series(1,100) g(x)") + before := testProvider.waitForConsumption(t) + + // Ensure the job system is not consuming RUs when scanning/claiming jobs. + tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() + env.AdvanceTime(24 * time.Hour) + time.Sleep(100 * time.Millisecond) + after := testProvider.waitForConsumption(t) + after.Sub(&before) + require.Zero(t, after.WriteBatches) + require.Zero(t, after.WriteBytes) + // Expect up to 3 batches for initial auto-stats query, schema catalog fill, + // and anything else that happens once during server startup but might not be + // done by this point. + require.LessOrEqual(t, after.ReadBatches, uint64(3)) + + // Make sure that at least 100 writes (deletes) are reported. The TTL job + // should not be exempt from cost control. + testutils.SucceedsSoon(t, func() error { + // Run all job schedules. + env.AdvanceTime(time.Minute) + require.NoError(t, execSchedules()) + + // Check consumption. + c := testProvider.waitForConsumption(t) + c.Sub(&before) + if c.WriteRequests < 100 { + return errors.New("no write requests reported") + } + return nil + }) +} + // TestConsumption verifies consumption reporting from a tenant server process. func TestConsumptionChangefeeds(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index eeebe550c20e..ca0364c6ea0d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -190,5 +190,7 @@ func init() { func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &streamIngestionResumer{job: job} - }) + }, + jobs.UsesTenantCostControl, + ) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index a54e2ef8adb3..9da9d7eebc38 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -206,5 +206,6 @@ func init() { job: job, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go index def40e447ac7..9789e5fd1ca7 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go @@ -120,5 +120,6 @@ func init() { timer: ts.NewTimer(), } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go index 89bf9dce2cd7..f0313f0b84b3 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go @@ -131,7 +131,7 @@ func TestStreamReplicationProducerJob(t *testing.T) { resumer: r, revertingFinished: waitJobFinishReverting, } - }) + }, jobs.UsesTenantCostControl) return mt, func() { in <- struct{}{} // Signals the timer that a new time is assigned to time source diff --git a/pkg/cli/debug_job_trace_test.go b/pkg/cli/debug_job_trace_test.go index 480ee4dabfd8..4ac3741b4f82 100644 --- a/pkg/cli/debug_job_trace_test.go +++ b/pkg/cli/debug_job_trace_test.go @@ -96,6 +96,7 @@ func TestDebugJobTrace(t *testing.T) { recordedSpanCh: recordedSpanCh, } }, + jobs.UsesTenantCostControl, ) // Create a "backup job" but we have overridden the resumer constructor above diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 3c195bd0d1a0..9f65f8f93152 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/base", "//pkg/jobs/jobspb", "//pkg/kv", + "//pkg/multitenant", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 9b20dee0c46b..4f6229f165b0 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -318,6 +319,12 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven } resumeCtx, cancel := r.makeCtx() + // If the job's type was registered to disable tenant cost control, then + // exclude the job's costs from tenant accounting. + if opts, ok := options[payload.Type()]; ok && opts.disableTenantCostControl { + resumeCtx = multitenant.WithTenantCostControlExemption(resumeCtx) + } + if alreadyAdopted := r.addAdoptedJob(jobID, s, cancel); alreadyAdopted { return nil } diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 56d10dc1fe0c..612d5dcfed25 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -176,7 +176,7 @@ func TestJobsControlForSchedules(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) record := Record{ Description: "fake job", @@ -289,7 +289,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) record := Record{ Description: "fake job", @@ -394,7 +394,7 @@ func TestJobControlByType(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) } for _, jobType := range allJobTypes { diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index b94abbc4a04f..820dbdbd8714 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security" @@ -547,6 +548,11 @@ func StartJobSchedulerDaemon( cfg *scheduledjobs.JobExecutionConfig, env scheduledjobs.JobSchedulerEnv, ) { + // Since the job scheduler system is not under user control, exclude it from + // from cost accounting and control. Individual jobs are not part of this + // exclusion. + ctx = multitenant.WithTenantCostControlExemption(ctx) + schedulerEnv := env var daemonKnobs *TestingKnobs if jobsKnobs, ok := cfg.TestingKnobs.(*TestingKnobs); ok { diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 44d7dbb5a73b..b0925bc51c4b 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -323,7 +323,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) { return rts.onPauseRequest(ctx, execCfg, txn, progress) }, } - }) + }, jobs.UsesTenantCostControl) } func (rts *registryTestSuite) tearDown() { @@ -1216,7 +1216,7 @@ func TestJobLifecycle(t *testing.T) { } }, } - }) + }, jobs.UsesTenantCostControl) startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) { beforeTime := timeutil.Now() @@ -1760,7 +1760,7 @@ func TestJobLifecycle(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) jobID := registry.MakeJobID() record := jobs.Record{ @@ -2212,7 +2212,7 @@ func TestShowJobWhenComplete(t *testing.T) { } }, } - }) + }, jobs.UsesTenantCostControl) type row struct { id jobspb.JobID @@ -2373,7 +2373,7 @@ func TestJobInTxn(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) // Piggy back on RESTORE to be able to create a failing test job. sql.AddPlanHook( "test", @@ -2404,7 +2404,7 @@ func TestJobInTxn(t *testing.T) { return errors.New("RESTORE failed") }, } - }) + }, jobs.UsesTenantCostControl) t.Run("rollback txn", func(t *testing.T) { start := timeutil.Now() @@ -2499,7 +2499,7 @@ func TestStartableJobMixedVersion(t *testing.T) { jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{} - }) + }, jobs.UsesTenantCostControl) var j *jobs.StartableJob jobID := jr.MakeJobID() require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { @@ -2542,7 +2542,7 @@ func TestStartableJob(t *testing.T) { return resumeFunc.Load().(func(ctx context.Context) error)(ctx) }, } - }) + }, jobs.UsesTenantCostControl) woodyP, _ := security.MakeSQLUsernameFromUserInput("Woody Pride", security.UsernameValidation) rec := jobs.Record{ Description: "There's a snake in my boot!", @@ -2737,7 +2737,7 @@ func TestStartableJobTxnRetry(t *testing.T) { jr := s.JobRegistry().(*jobs.Registry) jobs.RegisterConstructor(jobspb.TypeRestore, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{} - }) + }, jobs.UsesTenantCostControl) rec := jobs.Record{ Details: jobspb.RestoreDetails{}, Progress: jobspb.RestoreProgress{}, @@ -2785,7 +2785,7 @@ func TestUnmigratedSchemaChangeJobs(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) select { case <-resuming: t.Fatal("job was resumed") @@ -2845,7 +2845,7 @@ func TestRegistryTestingNudgeAdoptionQueue(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) before := timeutil.Now() jobID := registry.MakeJobID() _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */) @@ -2914,10 +2914,10 @@ func TestMetrics(t *testing.T) { } jobs.RegisterConstructor(jobspb.TypeBackup, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return res - }) + }, jobs.UsesTenantCostControl) jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return res - }) + }, jobs.UsesTenantCostControl) setup := func(t *testing.T) ( s serverutils.TestServerInterface, db *gosql.DB, r *jobs.Registry, cleanup func(), ) { @@ -3135,7 +3135,7 @@ func TestLoseLeaseDuringExecution(t *testing.T) { return err }, } - }) + }, jobs.UsesTenantCostControl) _, err := registry.CreateJobWithTxn(ctx, rec, registry.MakeJobID(), nil) require.NoError(t, err) @@ -3189,7 +3189,7 @@ func TestPauseReason(t *testing.T) { } }, } - }) + }, jobs.UsesTenantCostControl) rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, @@ -3406,7 +3406,7 @@ func TestPausepoints(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 43ff2bb4ed2d..4e616ff2633a 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -703,6 +704,10 @@ func (r *Registry) withSession(ctx context.Context, f withSessionFunc) { // jobs if it observes a failure. Otherwise it starts all the main daemons of // registry that poll the jobs table and start/cancel/gc jobs. func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { + // Since the job polling system is outside user control, exclude it from cost + // accounting and control. Individual jobs are not part of this exclusion. + ctx = multitenant.WithTenantCostControlExemption(ctx) + wrapWithSession := func(f withSessionFunc) func(ctx context.Context) { return func(ctx context.Context) { r.withSession(ctx, f) } } @@ -1076,6 +1081,51 @@ type Resumer interface { OnFailOrCancel(ctx context.Context, execCtx interface{}) error } +// RegisterOption is the template for options passed to the RegisterConstructor +// function. +type RegisterOption func(opts *registerOptions) + +// DisablesTenantCostControl allows job implementors to exclude their job's +// Storage I/O costs (i.e. from reads/writes) from tenant accounting, based on +// this principle: +// +// Jobs that are not triggered by user actions should be exempted from cost +// control. +// +// For example, SQL stats compaction, span reconciler, and long-running +// migration jobs are not triggered by user actions, and so should be exempted. +// However, backup jobs are triggered by user BACKUP requests and should be +// costed. Even auto stats jobs should be costed, since the user could choose to +// disable auto stats. +// +// NOTE: A cost control exemption does not exclude CPU or Egress costs from +// accounting, since those cannot be attributed to individual jobs. +var DisablesTenantCostControl = func(opts *registerOptions) { + opts.disableTenantCostControl = true + opts.hasTenantCostControlOption = true +} + +// UsesTenantCostControl indicates that resumed jobs should include their +// Storage I/O costs in tenant accounting. See DisablesTenantCostControl comment +// for more details. +var UsesTenantCostControl = func(opts *registerOptions) { + opts.disableTenantCostControl = false + opts.hasTenantCostControlOption = true +} + +// registerOptions are passed to RegisterConstructor and control how a job +// resumer is created and configured. +type registerOptions struct { + // disableTenantCostControl is true when a job's Storage I/O costs should + // be excluded from tenant accounting. See DisablesTenantCostControl comment. + disableTenantCostControl bool + + // hasTenantCostControlOption is true if either DisablesTenantCostControl or + // UsesTenantCostControl was specified as an option. RegisterConstructor will + // panic if this is false. + hasTenantCostControlOption bool +} + // PauseRequester is an extension of Resumer which allows job implementers to inject // logic during the transaction which moves a job to PauseRequested. type PauseRequester interface { @@ -1103,10 +1153,28 @@ type JobResultsReporter interface { type Constructor func(job *Job, settings *cluster.Settings) Resumer var constructors = make(map[jobspb.Type]Constructor) +var options = make(map[jobspb.Type]registerOptions) // RegisterConstructor registers a Resumer constructor for a certain job type. -func RegisterConstructor(typ jobspb.Type, fn Constructor) { +// +// NOTE: You must pass either jobs.UsesTenantCostControl or +// jobs.DisablesTenantCostControl as an option, or this method will panic; see +// comments for these options for more details on how to use them. We want +// engineers to explicitly pass one of these options so that they will be +// prompted to think about which is appropriate for their new job type. +func RegisterConstructor(typ jobspb.Type, fn Constructor, opts ...RegisterOption) { constructors[typ] = fn + + // Apply all options to the struct. + var resOpts registerOptions + for _, opt := range opts { + opt(&resOpts) + } + if !resOpts.hasTenantCostControlOption { + panic("when registering a new job type, either jobs.DisablesTenantCostControl " + + "or jobs.UsesTenantCostControl is required; see comments for these options to learn more") + } + options[typ] = resOpts } func (r *Registry) createResumer(job *Job, settings *cluster.Settings) (Resumer, error) { diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 236084cae716..6b9b0de135c6 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -374,7 +374,7 @@ func TestGCDurationControl(t *testing.T) { jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, cs *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{} - }) + }, jobs.UsesTenantCostControl) s, sqlDB, kvDB := serverutils.StartServer(t, args) defer s.Stopper().Stop(ctx) registry := s.JobRegistry().(*jobs.Registry) @@ -454,7 +454,7 @@ func TestErrorsPopulatedOnRetry(t *testing.T) { OnResume: execFn, FailOrCancel: execFn, } - }) + }, jobs.UsesTenantCostControl) s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 6869adc042da..c8e9dc9e4bb2 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -323,7 +323,7 @@ func TestBatchJobsCreation(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) // Create a batch of job specifications. var records []*Record @@ -514,7 +514,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { return <-bti.errCh }, } - }) + }, UsesTenantCostControl) return cleanup } @@ -787,7 +787,7 @@ func TestExponentialBackoffSettings(t *testing.T) { // Create and run a dummy job. RegisterConstructor(jobspb.TypeImport, func(_ *Job, cs *cluster.Settings) Resumer { return FakeResumer{} - }) + }, UsesTenantCostControl) registry := s.JobRegistry().(*Registry) id := registry.MakeJobID() require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -936,7 +936,7 @@ func TestRunWithoutLoop(t *testing.T) { return nil }, } - }) + }, UsesTenantCostControl) ctx := context.Background() settings := cluster.MakeTestingClusterSettings() @@ -1007,7 +1007,7 @@ func TestJobIdleness(t *testing.T) { return <-resumeErrChan }, } - }) + }, UsesTenantCostControl) currentlyIdle := r.MetricsStruct().JobMetrics[jobspb.TypeImport].CurrentlyIdle diff --git a/pkg/migration/migrationjob/migration_job.go b/pkg/migration/migrationjob/migration_job.go index f924f27df94c..b93217625b05 100644 --- a/pkg/migration/migrationjob/migration_job.go +++ b/pkg/migration/migrationjob/migration_job.go @@ -31,9 +31,12 @@ import ( ) func init() { + // Do not include the cost of long-running migrations in tenant accounting. + // NB: While the exemption excludes the cost of Storage I/O, it is not able + // to exclude the CPU cost. jobs.RegisterConstructor(jobspb.TypeMigration, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &resumer{j: job} - }) + }, jobs.DisablesTenantCostControl) } // NewRecord constructs a new jobs.Record for this migration. diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index d209312d16ea..1db9eba1900b 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -226,5 +226,8 @@ func init() { jobs.RegisterConstructor(jobspb.TypeAutoSpanConfigReconciliation, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &resumer{job: job} - }) + }, + // Do not include the cost of span reconciliation in tenant accounting. + jobs.DisablesTenantCostControl, + ) } diff --git a/pkg/sql/compact_sql_stats.go b/pkg/sql/compact_sql_stats.go index 7a3da622ff7f..101dbaccf592 100644 --- a/pkg/sql/compact_sql_stats.go +++ b/pkg/sql/compact_sql_stats.go @@ -248,12 +248,13 @@ func (e *scheduledSQLStatsCompactionExecutor) GetCreateScheduleStatement( } func init() { + // Do not include the cost of stats compaction in tenant accounting. jobs.RegisterConstructor(jobspb.TypeAutoSQLStatsCompaction, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &sqlStatsCompactionResumer{ job: job, st: settings, } - }) + }, jobs.DisablesTenantCostControl) jobs.RegisterScheduledJobExecutorFactory( tree.ScheduledSQLStatsCompactionExecutor.InternalName(), diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 1c0e14061d4a..ca3d0646e879 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -480,6 +481,12 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics { // Start starts the Server's background processing. func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { + // Exclude SQL background processing from cost accounting and limiting. + // NOTE: Only exclude background processing that is not under user control. + // If a user can opt in/out of some aspect of background processing, then it + // should be accounted for in their costs. + ctx = multitenant.WithTenantCostControlExemption(ctx) + s.sqlStats.Start(ctx, stopper) // reportedStats is periodically cleared to prevent too many SQL Stats diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index c4bdd560cd53..0f9441cd2aa4 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -700,6 +700,6 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &createStatsResumer{job: job} } - jobs.RegisterConstructor(jobspb.TypeCreateStats, createResumerFn) - jobs.RegisterConstructor(jobspb.TypeAutoCreateStats, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeCreateStats, createResumerFn, jobs.UsesTenantCostControl) + jobs.RegisterConstructor(jobspb.TypeAutoCreateStats, createResumerFn, jobs.UsesTenantCostControl) } diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 80c590304813..7098efd62726 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -340,5 +340,5 @@ func init() { job: job, } } - jobs.RegisterConstructor(jobspb.TypeSchemaChangeGC, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeSchemaChangeGC, createResumerFn, jobs.UsesTenantCostControl) } diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 4e7548683e29..00f68df22bae 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -1721,5 +1721,6 @@ func init() { settings: settings, } }, + jobs.UsesTenantCostControl, ) } diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 8a8f0484aa96..5004e33eb0c6 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -4946,7 +4946,7 @@ func TestImportControlJobRBAC(t *testing.T) { return nil }, } - }) + }, jobs.UsesTenantCostControl) startLeasedJob := func(t *testing.T, record jobs.Record) *jobs.StartableJob { job, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), record) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index ae081e7535c3..057bb4de93f6 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2896,7 +2896,7 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &schemaChangeResumer{job: job} } - jobs.RegisterConstructor(jobspb.TypeSchemaChange, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeSchemaChange, createResumerFn, jobs.UsesTenantCostControl) } // queueCleanupJob checks if the completed schema change needs to start a diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index b25a86b4e067..4868443772ae 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -30,7 +30,7 @@ func init() { return &newSchemaChangeResumer{ job: job, } - }) + }, jobs.UsesTenantCostControl) } type newSchemaChangeResumer struct { diff --git a/pkg/sql/stmtdiagnostics/BUILD.bazel b/pkg/sql/stmtdiagnostics/BUILD.bazel index 6acb31f60083..4809a089e19c 100644 --- a/pkg/sql/stmtdiagnostics/BUILD.bazel +++ b/pkg/sql/stmtdiagnostics/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/clusterversion", "//pkg/gossip", "//pkg/kv", + "//pkg/multitenant", "//pkg/roachpb", "//pkg/security", "//pkg/settings", diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index c40a36c99346..dd66d76af441 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -131,6 +132,11 @@ func NewRegistry( // Start will start the polling loop for the Registry. func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) { ctx, _ = stopper.WithCancelOnQuiesce(ctx) + + // Since background statement diagnostics collection is not under user + // control, exclude it from cost accounting and control. + ctx = multitenant.WithTenantCostControlExemption(ctx) + // NB: The only error that should occur here would be if the server were // shutting down so let's swallow it. _ = stopper.RunAsyncTask(ctx, "stmt-diag-poll", r.poll) diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 5de92ecb0956..1c2de5ecab72 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -749,6 +749,6 @@ func init() { job: job, st: settings, } - }) + }, jobs.UsesTenantCostControl) jobs.MakeRowLevelTTLMetricsHook = makeRowLevelTTLAggMetrics } diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index e72512e9d567..71a4cf341d26 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -1348,5 +1348,5 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &typeChangeResumer{job: job} } - jobs.RegisterConstructor(jobspb.TypeTypeSchemaChange, createResumerFn) + jobs.RegisterConstructor(jobspb.TypeTypeSchemaChange, createResumerFn, jobs.UsesTenantCostControl) }