Skip to content

Commit

Permalink
multitenant: exclude background job reads/writes from tenant cost con…
Browse files Browse the repository at this point in the history
…trol

Previously, reads and writes issued to the Storage layer by the job system
and by background jobs were included in tenant accounting. This is not ideal,
since we make a best effort to exclude operations from costing if they are
not under user control. This commit excludes the activities of the job system
from cost control, as well as the following jobs:

  - Long-running migration job
  - SQL stats compaction
  - Span config reconciler

None of these jobs are triggered by user actions and none can be disabled by
users.

NOTE: A cost control exemption does not exclude CPU or Egress costs from
accounting, since those cannot be attributed to individual jobs.

Release note: None
  • Loading branch information
andy-kimball committed Jul 27, 2022
1 parent 672da11 commit d7965f1
Show file tree
Hide file tree
Showing 32 changed files with 250 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,5 +936,6 @@ func init() {
job: job,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2554,5 +2554,6 @@ func init() {
settings: settings,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func init() {
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return &changefeedResumer{job: job}
},
jobs.UsesTenantCostControl,
)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ go_test(
"//pkg/cloud",
"//pkg/cloud/nodelocal",
"//pkg/cloud/nullsink",
"//pkg/jobs",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv/kvclient/kvtenant",
"//pkg/multitenant",
Expand All @@ -63,6 +65,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/stats",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
95 changes: 95 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package tenantcostclient_test
import (
"bytes"
"context"
gosql "database/sql"
"fmt"
"io"
"io/ioutil"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/nodelocal"
"github.com/cockroachdb/cockroach/pkg/cloud/nullsink"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand All @@ -43,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -968,6 +972,97 @@ func TestSQLLivenessExemption(t *testing.T) {
)
}

// TestScheduledJobsConsumption verifies that the scheduled jobs system itself
// does not consume RUs, but that the jobs it runs do consume RUs.
func TestScheduledJobsConsumption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stats.AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false)
tenantcostclient.TargetPeriodSetting.Override(ctx, &st.SV, time.Millisecond*20)

hostServer, _, _ := serverutils.StartServer(t, base.TestServerArgs{Settings: st})
defer hostServer.Stopper().Stop(ctx)

testProvider := newTestProvider()

env := jobstest.NewJobSchedulerTestEnv(jobstest.UseSystemTables, timeutil.Now())
var zeroDuration time.Duration
var execSchedules func() error
var tenantServer serverutils.TestTenantInterface
var tenantDB *gosql.DB
tenantServer, tenantDB = serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Settings: st,
AllowSettingClusterSettings: true,
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
OverrideTokenBucketProvider: func(kvtenant.TokenBucketProvider) kvtenant.TokenBucketProvider {
return testProvider
},
},
TTL: &sql.TTLTestingKnobs{
// Don't wait until we can use a historical query.
AOSTDuration: &zeroDuration,
},
JobsTestingKnobs: &jobs.TestingKnobs{
JobSchedulerEnv: env,
TakeOverJobsScheduling: func(fn func(ctx context.Context, maxSchedules int64) error) {
execSchedules = func() error {
defer tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
return fn(ctx, 0)
}
},
IntervalOverrides: jobs.TestingIntervalOverrides{
// Force fast adoption and resumption.
Adopt: &zeroDuration,
RetryInitialDelay: &zeroDuration,
RetryMaxDelay: &zeroDuration,
},
},
},
})

r := sqlutils.MakeSQLRunner(tenantDB)
// Create a table with rows that expire after a TTL. This will trigger the
// creation of a TTL job.
r.Exec(t, "CREATE TABLE t (v INT PRIMARY KEY) WITH ("+
"ttl_expire_after = '1 microsecond', ttl_job_cron = '* * * * ?', ttl_delete_batch_size = 1)")
r.Exec(t, "INSERT INTO t SELECT x FROM generate_series(1,100) g(x)")
before := testProvider.waitForConsumption(t)

// Ensure the job system is not consuming RUs when scanning/claiming jobs.
tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
env.AdvanceTime(24 * time.Hour)
time.Sleep(100 * time.Millisecond)
after := testProvider.waitForConsumption(t)
after.Sub(&before)
require.Zero(t, after.WriteBatches)
require.Zero(t, after.WriteBytes)
// Expect up to 3 batches for initial auto-stats query, schema catalog fill,
// and anything else that happens once during server startup but might not be
// done by this point.
require.LessOrEqual(t, after.ReadBatches, uint64(3))

// Make sure that at least 100 writes (deletes) are reported. The TTL job
// should not be exempt from cost control.
testutils.SucceedsSoon(t, func() error {
// Run all job schedules.
env.AdvanceTime(time.Minute)
require.NoError(t, execSchedules())

// Check consumption.
c := testProvider.waitForConsumption(t)
c.Sub(&before)
if c.WriteRequests < 100 {
return errors.New("no write requests reported")
}
return nil
})
}

// TestConsumption verifies consumption reporting from a tenant server process.
func TestConsumptionChangefeeds(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,7 @@ func init() {
func(job *jobs.Job,
settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{job: job}
})
},
jobs.UsesTenantCostControl,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,5 +206,6 @@ func init() {
job: job,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,6 @@ func init() {
timer: ts.NewTimer(),
}
},
jobs.UsesTenantCostControl,
)
}
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestStreamReplicationProducerJob(t *testing.T) {
resumer: r,
revertingFinished: waitJobFinishReverting,
}
})
}, jobs.UsesTenantCostControl)
return mt,
func() {
in <- struct{}{} // Signals the timer that a new time is assigned to time source
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/debug_job_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestDebugJobTrace(t *testing.T) {
recordedSpanCh: recordedSpanCh,
}
},
jobs.UsesTenantCostControl,
)

// Create a "backup job" but we have overridden the resumer constructor above
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/base",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -318,6 +319,12 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
}
resumeCtx, cancel := r.makeCtx()

// If the job's type was registered to disable tenant cost control, then
// exclude the job's costs from tenant accounting.
if opts, ok := options[payload.Type()]; ok && opts.disableTenantCostControl {
resumeCtx = multitenant.WithTenantCostControlExemption(resumeCtx)
}

if alreadyAdopted := r.addAdoptedJob(jobID, s, cancel); alreadyAdopted {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestJobsControlForSchedules(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)

record := Record{
Description: "fake job",
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)

record := Record{
Description: "fake job",
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestJobControlByType(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)
}

for _, jobType := range allJobTypes {
Expand Down
6 changes: 6 additions & 0 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -547,6 +548,11 @@ func StartJobSchedulerDaemon(
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
) {
// Since the job scheduler system is not under user control, exclude it from
// from cost accounting and control. Individual jobs are not part of this
// exclusion.
ctx = multitenant.WithTenantCostControlExemption(ctx)

schedulerEnv := env
var daemonKnobs *TestingKnobs
if jobsKnobs, ok := cfg.TestingKnobs.(*TestingKnobs); ok {
Expand Down
Loading

0 comments on commit d7965f1

Please sign in to comment.