Skip to content

Commit

Permalink
jobs: add virtual index for job_type in crdb_internal.jobs
Browse files Browse the repository at this point in the history
This change adds a virtual index on the `job_type` column
of `crdb_internal.jobs`. This change should make queries
on that table which filter on job type (such as `SHOW
CHANGEFEED JOBS`) more efficient.

Release note: None
  • Loading branch information
jayshrivastava committed Mar 14, 2023
1 parent 4fecf02 commit 85d878c
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 14 deletions.
13 changes: 7 additions & 6 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,19 +2025,20 @@ func TestShowAutomaticJobs(t *testing.T) {
}

for _, in := range rows {
// system.jobs is part proper SQL columns, part protobuf, so we can't use the
// row struct directly.
inPayload, err := protoutil.Marshal(&jobspb.Payload{
rawPayload := &jobspb.Payload{
UsernameProto: username.RootUserName().EncodeProto(),
Details: jobspb.WrapPayloadDetails(in.details),
})
}
// system.jobs is part proper SQL columns, part protobuf, so we can't use the
// row struct directly.
inPayload, err := protoutil.Marshal(rawPayload)
if err != nil {
t.Fatal(err)
}

sqlDB.Exec(t,
`INSERT INTO system.jobs (id, status, payload) VALUES ($1, $2, $3)`,
in.id, in.status, inPayload,
`INSERT INTO system.jobs (id, status, payload, job_type) VALUES ($1, $2, $3, $4)`,
in.id, in.status, inPayload, rawPayload.Type().String(),
)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1688,8 +1688,8 @@ func TestAdminAPIJobs(t *testing.T) {
t.Fatal(err)
}
sqlDB.Exec(t,
`INSERT INTO system.jobs (id, status, payload, progress, num_runs, last_run) VALUES ($1, $2, $3, $4, $5, $6)`,
job.id, job.status, payloadBytes, progressBytes, job.numRuns, job.lastRun,
`INSERT INTO system.jobs (id, status, payload, progress, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
job.id, job.status, payloadBytes, progressBytes, job.numRuns, job.lastRun, payload.Type().String(),
)
}

Expand Down
26 changes: 21 additions & 5 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,13 +1036,22 @@ const (
// Note that we are querying crdb_internal.system_jobs instead of system.jobs directly.
// The former has access control built in and will filter out jobs that the
// user is not allowed to see.
jobsQFrom = ` FROM crdb_internal.system_jobs`
jobsBackoffArgs = `(SELECT $1::FLOAT AS initial_delay, $2::FLOAT AS max_delay) args`
jobsStatusFilter = ` WHERE status = $3`
jobsQuery = jobsQSelect + `, last_run, COALESCE(num_runs, 0), ` + jobs.NextRunClause +
jobsQFrom = ` FROM crdb_internal.system_jobs`
jobsBackoffArgs = `(SELECT $1::FLOAT AS initial_delay, $2::FLOAT AS max_delay) args`
jobsStatusFilter = ` WHERE status = $3`
oldJobsTypeFilter = ` WHERE crdb_internal.job_payload_type(payload) = $3`
jobsTypeFilter = ` WHERE job_type = $3`
jobsQuery = jobsQSelect + `, last_run, COALESCE(num_runs, 0), ` + jobs.NextRunClause +
` as next_run` + jobsQFrom + ", " + jobsBackoffArgs
)

func getCRDBInternalJobsTableTypeFilter(ctx context.Context, version clusterversion.Handle) string {
if !version.IsActive(ctx, clusterversion.V23_1BackfillTypeColumnInJobsTable) {
return oldJobsTypeFilter
}
return jobsTypeFilter
}

// TODO(tbg): prefix with kv_.
var crdbInternalJobsTable = virtualSchemaTable{
schema: `
Expand All @@ -1069,7 +1078,8 @@ CREATE TABLE crdb_internal.jobs (
num_runs INT,
execution_errors STRING[],
execution_events JSONB,
INDEX(status)
INDEX(status),
INDEX(job_type)
)`,
comment: `decoded job metadata from crdb_internal.system_jobs (KV scan)`,
indexes: []virtualIndex{{
Expand All @@ -1078,6 +1088,12 @@ CREATE TABLE crdb_internal.jobs (
targetStatus := tree.MustBeDString(unwrappedConstraint)
return makeJobsTableRows(ctx, p, addRow, q, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay(), targetStatus)
},
}, {
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
q := jobsQuery + getCRDBInternalJobsTableTypeFilter(ctx, p.execCfg.Settings.Version)
targetStatus := tree.MustBeDString(unwrappedConstraint)
return makeJobsTableRows(ctx, p, addRow, q, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay(), targetStatus)
},
}},
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
_, err := makeJobsTableRows(ctx, p, addRow, jobsQuery, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay())
Expand Down
Loading

0 comments on commit 85d878c

Please sign in to comment.