Skip to content

Commit

Permalink
jobs: remove FOR UPDATE clause when updating job in most cases
Browse files Browse the repository at this point in the history
In cockroachdb currently, the `FOR UPDATE` lock in an exclusive lock. That
means that both clients trying to inspect jobs and the job adoption loops will
both try to scan the table and encounter these locks. For the most part, we
don't really update the job from the leaves of a distsql flow.

There is an exception which is IMPORT incrementing a sequence. In that case,
which motivated the initial locking addition, we'll leave the locking.

The other exception is pausing or canceling jobs. I think that in that case
we prefer to invalidate the work of the transaction as our intention is to
cancel it.

If cockroach implemented UPGRADE locks (#49684), then this FOR UPDATE would
not be a problem.

Release note (performance improvement): Jobs no longer hold exclusive locks
during the duration of their checkpointing transactions which can result in
long wait times when trying to run SHOW JOBS.
  • Loading branch information
ajwerner committed Jul 15, 2021
1 parent b5249b3 commit 84654db
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
11 changes: 8 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,15 +599,20 @@ func (r *Registry) LoadJobWithTxn(

// UpdateJobWithTxn calls the Update method on an existing job with jobID, using
// a transaction passed in the txn argument. Passing a nil transaction means
// that a txn will be automatically created.
// that a txn will be automatically created. The useReadLock parameter will
// have the update acquire an exclusive lock on the job row when reading. This
// can help eliminate restarts in the face of concurrent updates at the cost of
// locking the row from readers. Most updates of a job do not expect contention
// and may do extra work and thus should not do locking. Cases where the job
// is used to coordinate resources from multiple nodes may benefit from locking.
func (r *Registry) UpdateJobWithTxn(
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, updateFunc UpdateFn,
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, useReadLock bool, updateFunc UpdateFn,
) error {
j := &Job{
id: jobID,
registry: r,
}
return j.Update(ctx, txn, updateFunc)
return j.update(ctx, txn, useReadLock, updateFunc)
}

// DefaultCancelInterval is a reasonable interval at which to poll this node
Expand Down
36 changes: 29 additions & 7 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,21 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU
// Note that there are various convenience wrappers (like FractionProgressed)
// defined in jobs.go.
func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error {
const useReadLock = false
return j.update(ctx, txn, useReadLock, updateFn)
}

func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateFn UpdateFn) error {
var payload *jobspb.Payload
var progress *jobspb.Progress

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
stmt := "SELECT status, payload, progress FROM system.jobs WHERE id = $1 FOR UPDATE"
if j.sessionID != "" {
stmt = "SELECT status, payload, progress, claim_session_id FROM system." +
"jobs WHERE id = $1 FOR UPDATE"
}
var err error
var row tree.Datums
row, err = j.registry.ex.QueryRowEx(
ctx, "log-job", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
stmt, j.ID(),
ctx, "log-job", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
getSelectStmtForJobUpdate(j.sessionID != "", useReadLock), j.ID(),
)
if err != nil {
return err
Expand Down Expand Up @@ -253,3 +255,23 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error
}
return nil
}

// getSelectStmtForJobUpdate constructs the select statement used in Job.update.
func getSelectStmtForJobUpdate(hasSessionID, useReadLock bool) string {
const (
selectWithoutSession = `SELECT status, payload, progress`
selectWithSession = selectWithoutSession + `, claim_session_id`
from = ` FROM system.jobs WHERE id = $1`
fromForUpdate = from + ` FOR UPDATE`
)
if hasSessionID {
if useReadLock {
return selectWithSession + fromForUpdate
}
return selectWithSession + from
}
if useReadLock {
return selectWithoutSession + fromForUpdate
}
return selectWithoutSession + from
}
3 changes: 2 additions & 1 deletion pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ func (j *SeqChunkProvider) RequestChunk(
ju.UpdateProgress(progress)
return nil
}
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, resolveChunkFunc)
const useReadLock = true
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, useReadLock, resolveChunkFunc)
if err != nil {
return err
}
Expand Down

0 comments on commit 84654db

Please sign in to comment.