Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: multitenant: exclude background job reads/writes from tenant cost con… #85105

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,5 +936,6 @@ func init() {
job: job,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2554,5 +2554,6 @@ func init() {
settings: settings,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func init() {
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return &changefeedResumer{job: job}
},
jobs.UsesTenantCostControl,
)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ go_test(
"//pkg/cloud",
"//pkg/cloud/nodelocal",
"//pkg/cloud/nullsink",
"//pkg/jobs",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv/kvclient/kvtenant",
"//pkg/multitenant",
Expand All @@ -63,6 +65,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/stats",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
95 changes: 95 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package tenantcostclient_test
import (
"bytes"
"context"
gosql "database/sql"
"fmt"
"io"
"io/ioutil"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/nodelocal"
"github.com/cockroachdb/cockroach/pkg/cloud/nullsink"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand All @@ -43,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -968,6 +972,97 @@ func TestSQLLivenessExemption(t *testing.T) {
)
}

// TestScheduledJobsConsumption verifies that the scheduled jobs system itself
// does not consume RUs, but that the jobs it runs do consume RUs.
func TestScheduledJobsConsumption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stats.AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false)
tenantcostclient.TargetPeriodSetting.Override(ctx, &st.SV, time.Millisecond*20)

hostServer, _, _ := serverutils.StartServer(t, base.TestServerArgs{Settings: st})
defer hostServer.Stopper().Stop(ctx)

testProvider := newTestProvider()

env := jobstest.NewJobSchedulerTestEnv(jobstest.UseSystemTables, timeutil.Now())
var zeroDuration time.Duration
var execSchedules func() error
var tenantServer serverutils.TestTenantInterface
var tenantDB *gosql.DB
tenantServer, tenantDB = serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Settings: st,
AllowSettingClusterSettings: true,
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
OverrideTokenBucketProvider: func(kvtenant.TokenBucketProvider) kvtenant.TokenBucketProvider {
return testProvider
},
},
TTL: &sql.TTLTestingKnobs{
// Don't wait until we can use a historical query.
AOSTDuration: &zeroDuration,
},
JobsTestingKnobs: &jobs.TestingKnobs{
JobSchedulerEnv: env,
TakeOverJobsScheduling: func(fn func(ctx context.Context, maxSchedules int64) error) {
execSchedules = func() error {
defer tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
return fn(ctx, 0)
}
},
IntervalOverrides: jobs.TestingIntervalOverrides{
// Force fast adoption and resumption.
Adopt: &zeroDuration,
RetryInitialDelay: &zeroDuration,
RetryMaxDelay: &zeroDuration,
},
},
},
})

r := sqlutils.MakeSQLRunner(tenantDB)
// Create a table with rows that expire after a TTL. This will trigger the
// creation of a TTL job.
r.Exec(t, "CREATE TABLE t (v INT PRIMARY KEY) WITH ("+
"ttl_expire_after = '1 microsecond', ttl_job_cron = '* * * * ?', ttl_delete_batch_size = 1)")
r.Exec(t, "INSERT INTO t SELECT x FROM generate_series(1,100) g(x)")
before := testProvider.waitForConsumption(t)

// Ensure the job system is not consuming RUs when scanning/claiming jobs.
tenantServer.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
env.AdvanceTime(24 * time.Hour)
time.Sleep(100 * time.Millisecond)
after := testProvider.waitForConsumption(t)
after.Sub(&before)
require.Zero(t, after.WriteBatches)
require.Zero(t, after.WriteBytes)
// Expect up to 3 batches for initial auto-stats query, schema catalog fill,
// and anything else that happens once during server startup but might not be
// done by this point.
require.LessOrEqual(t, after.ReadBatches, uint64(3))

// Make sure that at least 100 writes (deletes) are reported. The TTL job
// should not be exempt from cost control.
testutils.SucceedsSoon(t, func() error {
// Run all job schedules.
env.AdvanceTime(time.Minute)
require.NoError(t, execSchedules())

// Check consumption.
c := testProvider.waitForConsumption(t)
c.Sub(&before)
if c.WriteRequests < 100 {
return errors.New("no write requests reported")
}
return nil
})
}

// TestConsumption verifies consumption reporting from a tenant server process.
func TestConsumptionChangefeeds(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,7 @@ func init() {
func(job *jobs.Job,
settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{job: job}
})
},
jobs.UsesTenantCostControl,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,5 +206,6 @@ func init() {
job: job,
}
},
jobs.UsesTenantCostControl,
)
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,6 @@ func init() {
timer: ts.NewTimer(),
}
},
jobs.UsesTenantCostControl,
)
}
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestStreamReplicationProducerJob(t *testing.T) {
resumer: r,
revertingFinished: waitJobFinishReverting,
}
})
}, jobs.UsesTenantCostControl)
return mt,
func() {
in <- struct{}{} // Signals the timer that a new time is assigned to time source
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/debug_job_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestDebugJobTrace(t *testing.T) {
recordedSpanCh: recordedSpanCh,
}
},
jobs.UsesTenantCostControl,
)

// Create a "backup job" but we have overridden the resumer constructor above
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/base",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -318,6 +319,12 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
}
resumeCtx, cancel := r.makeCtx()

// If the job's type was registered to disable tenant cost control, then
// exclude the job's costs from tenant accounting.
if opts, ok := options[payload.Type()]; ok && opts.disableTenantCostControl {
resumeCtx = multitenant.WithTenantCostControlExemption(resumeCtx)
}

if alreadyAdopted := r.addAdoptedJob(jobID, s, cancel); alreadyAdopted {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestJobsControlForSchedules(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)

record := Record{
Description: "fake job",
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)

record := Record{
Description: "fake job",
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestJobControlByType(t *testing.T) {
return nil
},
}
})
}, UsesTenantCostControl)
}

for _, jobType := range allJobTypes {
Expand Down
6 changes: 6 additions & 0 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -547,6 +548,11 @@ func StartJobSchedulerDaemon(
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
) {
// Since the job scheduler system is not under user control, exclude it from
// from cost accounting and control. Individual jobs are not part of this
// exclusion.
ctx = multitenant.WithTenantCostControlExemption(ctx)

schedulerEnv := env
var daemonKnobs *TestingKnobs
if jobsKnobs, ok := cfg.TestingKnobs.(*TestingKnobs); ok {
Expand Down
Loading