Skip to content

Commit

Permalink
jobs: use job_type column for RunningJobExists queries
Browse files Browse the repository at this point in the history
Avoiding the virtual table avoids the join on infos.

Release note: none.
Epic: none.
  • Loading branch information
dt committed Apr 25, 2023
1 parent 2d4f44b commit ef850fb
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
73 changes: 68 additions & 5 deletions pkg/jobs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,85 @@ package jobs

import (
"context"
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)

// RunningJobExists checks that whether there are any job of the given types in
// the pending, running, or paused status, optionally ignoring the job with the
// ID specified by ignoreJobID, and any jobs created after it, if it is not
// InvalidJobID.
// RunningJobExists checks that whether there are any job of the given types
// in the pending, running, or paused status, optionally ignoring the job with
// the ID specified by ignoreJobID as well as any jobs created after it, if
// the passed ID is not InvalidJobID.
func RunningJobExists(
ctx context.Context,
ignoreJobID jobspb.JobID,
txn isql.Txn,
cv clusterversion.Handle,
jobTypes ...jobspb.Type,
) (exists bool, retErr error) {
if !cv.IsActive(ctx, clusterversion.V23_1BackfillTypeColumnInJobsTable) {
return legacyRunningJobExists(ctx, ignoreJobID, txn, jobTypes...)
}

var typeStrs string
switch len(jobTypes) {
case 0:
return false, errors.AssertionFailedf("must specify job types")
case 1:
typeStrs = fmt.Sprintf("('%s')", jobTypes[0].String())
case 2:
typeStrs = fmt.Sprintf("('%s', '%s')", jobTypes[0].String(), jobTypes[1].String())
default:
var s strings.Builder
fmt.Fprintf(&s, "('%s'", jobTypes[0].String())
for _, typ := range jobTypes[1:] {
fmt.Fprintf(&s, ", '%s'", typ.String())
}
s.WriteByte(')')
typeStrs = s.String()
}

stmt := `
SELECT
id
FROM
system.jobs@jobs_status_created_idx
WHERE
job_type IN ` + typeStrs + ` AND
status IN ` + NonTerminalStatusTupleString + `
ORDER BY created
LIMIT 1`
it, err := txn.QueryIterator(
ctx,
"find-running-jobs-of-type",
txn.KV(),
stmt,
)
if err != nil {
return false, err
}
// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()

ok, err := it.Next(ctx)
if err != nil {
return false, err
}
// The query is ordered by `created` so if the first is the ignored ID, then
// any additional rows that would match the passed types must be created after
// the ignored ID and are also supposed to be ignored, meaning we only return
// true when the there are non-zero results and the first does not match.
return ok && jobspb.JobID(*it.Cur()[0].(*tree.DInt)) != ignoreJobID, nil
}

func legacyRunningJobExists(
ctx context.Context, jobID jobspb.JobID, txn isql.Txn, jobTypes ...jobspb.Type,
) (exists bool, retErr error) {
const stmt = `
SELECT
Expand Down Expand Up @@ -68,7 +131,7 @@ ORDER BY created`
}
if isTyp {
id := jobspb.JobID(*row[0].(*tree.DInt))
if id == ignoreJobID {
if id == jobID {
break
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro

var job *jobs.Job
if err := m.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn,
exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn, m.settings.Version,
jobspb.TypeAutoSpanConfigReconciliation)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) erro
}
var exists bool
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
exists, err = jobs.RunningJobExists(ctx, jobID, txn,
exists, err = jobs.RunningJobExists(ctx, jobID, txn, p.ExecCfg().Settings.Version,
jobspb.TypeCreateStats, jobspb.TypeAutoCreateStats,
)
return err
Expand Down

0 comments on commit ef850fb

Please sign in to comment.