Skip to content

Commit

Permalink
Merge pull request #102238 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1.0-102171

release-23.1.0: jobs: use job_type column for RunningJobExists queries
  • Loading branch information
dt authored Apr 25, 2023
2 parents 863b9a4 + b1a9518 commit e0f36a2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 16 deletions.
85 changes: 77 additions & 8 deletions pkg/jobs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,85 @@ package jobs

import (
"context"
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)

// RunningJobExists checks that whether there are any other jobs (matched by
// payloadPredicate callback) in the pending, running, or paused status that
// started earlier than the job with provided jobID.
// If the provided jobID is a jobspb.InvalidJobID, this function checks if
// exists any jobs that matches the payloadPredicate.
// RunningJobExists checks that whether there are any job of the given types
// in the pending, running, or paused status, optionally ignoring the job with
// the ID specified by ignoreJobID as well as any jobs created after it, if
// the passed ID is not InvalidJobID.
func RunningJobExists(
ctx context.Context,
jobID jobspb.JobID,
ignoreJobID jobspb.JobID,
txn isql.Txn,
payloadPredicate func(payload *jobspb.Payload) bool,
cv clusterversion.Handle,
jobTypes ...jobspb.Type,
) (exists bool, retErr error) {
if !cv.IsActive(ctx, clusterversion.V23_1BackfillTypeColumnInJobsTable) {
return legacyRunningJobExists(ctx, ignoreJobID, txn, jobTypes...)
}

var typeStrs string
switch len(jobTypes) {
case 0:
return false, errors.AssertionFailedf("must specify job types")
case 1:
typeStrs = fmt.Sprintf("('%s')", jobTypes[0].String())
case 2:
typeStrs = fmt.Sprintf("('%s', '%s')", jobTypes[0].String(), jobTypes[1].String())
default:
var s strings.Builder
fmt.Fprintf(&s, "('%s'", jobTypes[0].String())
for _, typ := range jobTypes[1:] {
fmt.Fprintf(&s, ", '%s'", typ.String())
}
s.WriteByte(')')
typeStrs = s.String()
}

stmt := `
SELECT
id
FROM
system.jobs@jobs_status_created_idx
WHERE
job_type IN ` + typeStrs + ` AND
status IN ` + NonTerminalStatusTupleString + `
ORDER BY created
LIMIT 1`
it, err := txn.QueryIterator(
ctx,
"find-running-jobs-of-type",
txn.KV(),
stmt,
)
if err != nil {
return false, err
}
// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()

ok, err := it.Next(ctx)
if err != nil {
return false, err
}
// The query is ordered by `created` so if the first is the ignored ID, then
// any additional rows that would match the passed types must be created after
// the ignored ID and are also supposed to be ignored, meaning we only return
// true when the there are non-zero results and the first does not match.
return ok && jobspb.JobID(*it.Cur()[0].(*tree.DInt)) != ignoreJobID, nil
}

func legacyRunningJobExists(
ctx context.Context, jobID jobspb.JobID, txn isql.Txn, jobTypes ...jobspb.Type,
) (exists bool, retErr error) {
const stmt = `
SELECT
Expand Down Expand Up @@ -60,7 +122,14 @@ ORDER BY created`
return false /* exists */, err
}

if payloadPredicate(payload) {
isTyp := false
for _, typ := range jobTypes {
if payload.Type() == typ {
isTyp = true
break
}
}
if isTyp {
id := jobspb.JobID(*row[0].(*tree.DInt))
if id == jobID {
break
Expand Down
7 changes: 2 additions & 5 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,8 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro

var job *jobs.Job
if err := m.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn,
func(payload *jobspb.Payload) bool {
return payload.Type() == jobspb.TypeAutoSpanConfigReconciliation
},
)
exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn, m.settings.Version,
jobspb.TypeAutoSpanConfigReconciliation)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,9 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) erro
}
var exists bool
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
exists, err = jobs.RunningJobExists(ctx, jobID, txn, func(payload *jobspb.Payload) bool {
return payload.Type() == jobspb.TypeCreateStats || payload.Type() == jobspb.TypeAutoCreateStats
})
exists, err = jobs.RunningJobExists(ctx, jobID, txn, p.ExecCfg().Settings.Version,
jobspb.TypeCreateStats, jobspb.TypeAutoCreateStats,
)
return err
}); err != nil {
return err
Expand Down

0 comments on commit e0f36a2

Please sign in to comment.