Skip to content

Commit

Permalink
Merge pull request cockroachdb#85105 from andy-kimball/backport22.1-8…
Browse files Browse the repository at this point in the history
…3005

release-22.1: multitenant: exclude background job reads/writes from tenant cost con…
  • Loading branch information
andy-kimball authored Jul 27, 2022
2 parents 050c27c + d7965f1 commit f162001
Show file tree
Hide file tree
Showing 32 changed files with 250 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,5 +936,6 @@ func init() {
job: job,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2554,5 +2554,6 @@ func init() {
settings: settings,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func init() {
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return &changefeedResumer{job: job}
},
jobs.UsesTenantCostControl,
)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ go_test(
"//pkg/cloud",
"//pkg/cloud/nodelocal",
"//pkg/cloud/nullsink",
"//pkg/jobs",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv/kvclient/kvtenant",
"//pkg/multitenant",
Expand All @@ -63,6 +65,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/stats",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
95 changes: 95 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package tenantcostclient_test
import (
"bytes"
"context"
gosql "database/sql"
"fmt"
"io"
"io/ioutil"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/nodelocal"
"github.com/cockroachdb/cockroach/pkg/cloud/nullsink"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand All @@ -43,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -968,6 +972,97 @@ func TestSQLLivenessExemption(t *testing.T) {
)
}

// TestScheduledJobsConsumption verifies that the scheduled jobs system itself
// does not consume RUs, but that the jobs it runs do consume RUs.
func TestScheduledJobsConsumption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stats.AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false)
tenantcostclient.TargetPeriodSetting.Override(ctx, &st.SV, time.Millisecond*20)

hostServer, _, _ := serverutils.StartServer(t, base.TestServerArgs{Settings: st})
defer hostServer.Stopper().Stop(ctx)

testProvider := newTestProvider()

env := jobstest.NewJobSchedulerTestEnv(jobstest.UseSystemTables, timeutil.Now())
var zeroDuration time.Duration
var execSchedules func() error
var tenantServer serverutils.TestTenantInterface
var tenantDB *gosql.DB
tenantServer, tenantDB = serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Settings: st,
AllowSettingClusterSettings: true,
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
OverrideTokenBucketProvider: func(kvtenant.TokenBucketProvider) kvtenant.TokenBucketProvider {
return testProvider
},
},
TTL: &sql.TTLTestingKnobs{
// Don't wait until we can use a historical query.
AOSTDuration: &zeroDuration,
},
JobsTestingKnobs: &jobs.TestingKnobs{
JobSchedulerEnv: env,
TakeOverJobsScheduling: func(fn func(ctx context.Context, maxSchedules int64) error) {
execSchedules = func() error {
defer tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
return fn(ctx, 0)
}
},
IntervalOverrides: jobs.TestingIntervalOverrides{
// Force fast adoption and resumption.
Adopt: &zeroDuration,
RetryInitialDelay: &zeroDuration,
RetryMaxDelay: &zeroDuration,
},
},
},
})

r := sqlutils.MakeSQLRunner(tenantDB)
// Create a table with rows that expire after a TTL. This will trigger the
// creation of a TTL job.
r.Exec(t, "CREATE TABLE t (v INT PRIMARY KEY) WITH ("+
"ttl_expire_after = '1 microsecond', ttl_job_cron = '* * * * ?', ttl_delete_batch_size = 1)")
r.Exec(t, "INSERT INTO t SELECT x FROM generate_series(1,100) g(x)")
before := testProvider.waitForConsumption(t)

// Ensure the job system is not consuming RUs when scanning/claiming jobs.
tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
env.AdvanceTime(24 * time.Hour)
time.Sleep(100 * time.Millisecond)
after := testProvider.waitForConsumption(t)
after.Sub(&before)
require.Zero(t, after.WriteBatches)
require.Zero(t, after.WriteBytes)
// Expect up to 3 batches for initial auto-stats query, schema catalog fill,
// and anything else that happens once during server startup but might not be
// done by this point.
require.LessOrEqual(t, after.ReadBatches, uint64(3))

// Make sure that at least 100 writes (deletes) are reported. The TTL job
// should not be exempt from cost control.
testutils.SucceedsSoon(t, func() error {
// Run all job schedules.
env.AdvanceTime(time.Minute)
require.NoError(t, execSchedules())

// Check consumption.
c := testProvider.waitForConsumption(t)
c.Sub(&before)
if c.WriteRequests < 100 {
return errors.New("no write requests reported")
}
return nil
})
}

// TestConsumption verifies consumption reporting from a tenant server process.
func TestConsumptionChangefeeds(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,7 @@ func init() {
func(job *jobs.Job,
settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{job: job}
})
},
jobs.UsesTenantCostControl,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,5 +206,6 @@ func init() {
job: job,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,6 @@ func init() {
timer: ts.NewTimer(),
}
},
jobs.UsesTenantCostControl,
)
}
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestStreamReplicationProducerJob(t *testing.T) {
resumer: r,
revertingFinished: waitJobFinishReverting,
}
})
}, jobs.UsesTenantCostControl)
return mt,
func() {
in <- struct{}{} // Signals the timer that a new time is assigned to time source
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/debug_job_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestDebugJobTrace(t *testing.T) {
recordedSpanCh: recordedSpanCh,
}
},
jobs.UsesTenantCostControl,
)

// Create a "backup job" but we have overridden the resumer constructor above
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/base",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -318,6 +319,12 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
}
resumeCtx, cancel := r.makeCtx()

// If the job's type was registered to disable tenant cost control, then
// exclude the job's costs from tenant accounting.
if opts, ok := options[payload.Type()]; ok && opts.disableTenantCostControl {
resumeCtx = multitenant.WithTenantCostControlExemption(resumeCtx)
}

if alreadyAdopted := r.addAdoptedJob(jobID, s, cancel); alreadyAdopted {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestJobsControlForSchedules(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)

record := Record{
Description: "fake job",
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)

record := Record{
Description: "fake job",
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestJobControlByType(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)
}

for _, jobType := range allJobTypes {
Expand Down
6 changes: 6 additions & 0 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -547,6 +548,11 @@ func StartJobSchedulerDaemon(
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
) {
// Since the job scheduler system is not under user control, exclude it from
// from cost accounting and control. Individual jobs are not part of this
// exclusion.
ctx = multitenant.WithTenantCostControlExemption(ctx)

schedulerEnv := env
var daemonKnobs *TestingKnobs
if jobsKnobs, ok := cfg.TestingKnobs.(*TestingKnobs); ok {
Expand Down
Loading

0 comments on commit f162001

Please sign in to comment.