From 2d4f44bf0659271159d244d86fbf37b1cf6018ea Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 24 Apr 2023 18:06:31 +0000 Subject: [PATCH 1/2] jobs: switch RunningJobExists to take list of types The only payload predicates ever used were just to check type so moving that to the signature. Release note: none. Epic: none. --- pkg/jobs/utils.go | 24 +++++++++++++-------- pkg/spanconfig/spanconfigmanager/manager.go | 5 +---- pkg/sql/create_stats.go | 6 +++--- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index 077d3217ca6d..781952be4603 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -19,16 +19,15 @@ import ( "github.com/cockroachdb/errors" ) -// RunningJobExists checks that whether there are any other jobs (matched by -// payloadPredicate callback) in the pending, running, or paused status that -// started earlier than the job with provided jobID. -// If the provided jobID is a jobspb.InvalidJobID, this function checks if -// exists any jobs that matches the payloadPredicate. +// RunningJobExists checks that whether there are any job of the given types in +// the pending, running, or paused status, optionally ignoring the job with the +// ID specified by ignoreJobID, and any jobs created after it, if it is not +// InvalidJobID. func RunningJobExists( ctx context.Context, - jobID jobspb.JobID, + ignoreJobID jobspb.JobID, txn isql.Txn, - payloadPredicate func(payload *jobspb.Payload) bool, + jobTypes ...jobspb.Type, ) (exists bool, retErr error) { const stmt = ` SELECT @@ -60,9 +59,16 @@ ORDER BY created` return false /* exists */, err } - if payloadPredicate(payload) { + isTyp := false + for _, typ := range jobTypes { + if payload.Type() == typ { + isTyp = true + break + } + } + if isTyp { id := jobspb.JobID(*row[0].(*tree.DInt)) - if id == jobID { + if id == ignoreJobID { break } diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index de8c293a5f61..bf576f3479ce 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -186,10 +186,7 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro var job *jobs.Job if err := m.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn, - func(payload *jobspb.Payload) bool { - return payload.Type() == jobspb.TypeAutoSpanConfigReconciliation - }, - ) + jobspb.TypeAutoSpanConfigReconciliation) if err != nil { return err } diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 501cd85aaf03..3f4e54666db5 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -707,9 +707,9 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) erro } var exists bool if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) { - exists, err = jobs.RunningJobExists(ctx, jobID, txn, func(payload *jobspb.Payload) bool { - return payload.Type() == jobspb.TypeCreateStats || payload.Type() == jobspb.TypeAutoCreateStats - }) + exists, err = jobs.RunningJobExists(ctx, jobID, txn, + jobspb.TypeCreateStats, jobspb.TypeAutoCreateStats, + ) return err }); err != nil { return err From ef850fb9773921d19cfa220fd756623c4d91505c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 24 Apr 2023 18:29:55 +0000 Subject: [PATCH 2/2] jobs: use job_type column for RunningJobExists queries Avoiding the virtual table avoids the join on infos. Release note: none. Epic: none. --- pkg/jobs/utils.go | 73 +++++++++++++++++++-- pkg/spanconfig/spanconfigmanager/manager.go | 2 +- pkg/sql/create_stats.go | 2 +- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index 781952be4603..7af25f6420a5 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -12,22 +12,85 @@ package jobs import ( "context" + "fmt" + "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) -// RunningJobExists checks that whether there are any job of the given types in -// the pending, running, or paused status, optionally ignoring the job with the -// ID specified by ignoreJobID, and any jobs created after it, if it is not -// InvalidJobID. +// RunningJobExists checks that whether there are any job of the given types +// in the pending, running, or paused status, optionally ignoring the job with +// the ID specified by ignoreJobID as well as any jobs created after it, if +// the passed ID is not InvalidJobID. func RunningJobExists( ctx context.Context, ignoreJobID jobspb.JobID, txn isql.Txn, + cv clusterversion.Handle, jobTypes ...jobspb.Type, +) (exists bool, retErr error) { + if !cv.IsActive(ctx, clusterversion.V23_1BackfillTypeColumnInJobsTable) { + return legacyRunningJobExists(ctx, ignoreJobID, txn, jobTypes...) + } + + var typeStrs string + switch len(jobTypes) { + case 0: + return false, errors.AssertionFailedf("must specify job types") + case 1: + typeStrs = fmt.Sprintf("('%s')", jobTypes[0].String()) + case 2: + typeStrs = fmt.Sprintf("('%s', '%s')", jobTypes[0].String(), jobTypes[1].String()) + default: + var s strings.Builder + fmt.Fprintf(&s, "('%s'", jobTypes[0].String()) + for _, typ := range jobTypes[1:] { + fmt.Fprintf(&s, ", '%s'", typ.String()) + } + s.WriteByte(')') + typeStrs = s.String() + } + + stmt := ` +SELECT + id +FROM + system.jobs@jobs_status_created_idx +WHERE + job_type IN ` + typeStrs + ` AND + status IN ` + NonTerminalStatusTupleString + ` +ORDER BY created +LIMIT 1` + it, err := txn.QueryIterator( + ctx, + "find-running-jobs-of-type", + txn.KV(), + stmt, + ) + if err != nil { + return false, err + } + // We have to make sure to close the iterator since we might return from the + // for loop early (before Next() returns false). + defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }() + + ok, err := it.Next(ctx) + if err != nil { + return false, err + } + // The query is ordered by `created` so if the first is the ignored ID, then + // any additional rows that would match the passed types must be created after + // the ignored ID and are also supposed to be ignored, meaning we only return + // true when the there are non-zero results and the first does not match. + return ok && jobspb.JobID(*it.Cur()[0].(*tree.DInt)) != ignoreJobID, nil +} + +func legacyRunningJobExists( + ctx context.Context, jobID jobspb.JobID, txn isql.Txn, jobTypes ...jobspb.Type, ) (exists bool, retErr error) { const stmt = ` SELECT @@ -68,7 +131,7 @@ ORDER BY created` } if isTyp { id := jobspb.JobID(*row[0].(*tree.DInt)) - if id == ignoreJobID { + if id == jobID { break } diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index bf576f3479ce..2d1d02009549 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -185,7 +185,7 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro var job *jobs.Job if err := m.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn, + exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn, m.settings.Version, jobspb.TypeAutoSpanConfigReconciliation) if err != nil { return err diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 3f4e54666db5..dd67725ab8ed 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -707,7 +707,7 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) erro } var exists bool if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) { - exists, err = jobs.RunningJobExists(ctx, jobID, txn, + exists, err = jobs.RunningJobExists(ctx, jobID, txn, p.ExecCfg().Settings.Version, jobspb.TypeCreateStats, jobspb.TypeAutoCreateStats, ) return err