Skip to content

Commit

Permalink
Merge #67660 #68241
Browse files Browse the repository at this point in the history
67660: jobs: remove FOR UPDATE clause when updating job r=ajwerner a=ajwerner

In cockroachdb currently, the `FOR UPDATE` lock in an exclusive lock. That
means that both clients trying to inspect jobs and the job adoption loops will
both try to scan the table and encounter these locks. For the most part, we
don't really update the job from the leaves of a distsql flow. There is an
exception which is IMPORT incrementing a sequence. Nevertheless, the retry
behavior there seems sound. The other exception is pausing or canceling jobs.
I think that in that case we prefer to invalidate the work of the transaction
as our intention is to cancel it.

If cockroach implemented UPGRADE locks (#49684), then this FOR UPDATE would
not be a problem.

Release note (performance improvement): Jobs no longer hold exclusive locks
during the duration of their checkpointing transactions which can result in
long wait times when trying to run SHOW JOBS.

68241: changefeedccl: Change unreachable panic to unreachable error r=[miretskiy] a=HonoreDB

I missed the context but apparently we're doing this everywhere we can.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
3 people committed Jul 29, 2021
3 parents 4938d16 + 269bf63 + 6c1d2e7 commit 539496d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,11 +573,11 @@ func (e *confluentAvroEncoder) register(
type nativeEncoder struct{}

func (e *nativeEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) {
panic("EncodeKey should not be called on nativeEncoder")
return nil, errors.New("EncodeKey unexpectedly called on nativeEncoder")
}

func (e *nativeEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) {
panic("EncodeValue should not be called on nativeEncoder")
return nil, errors.New("EncodeValue unexpectedly called on nativeEncoder")
}

func (e *nativeEncoder) EncodeResolvedTimestamp(
Expand Down
11 changes: 8 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,15 +552,20 @@ func (r *Registry) LoadJobWithTxn(

// UpdateJobWithTxn calls the Update method on an existing job with jobID, using
// a transaction passed in the txn argument. Passing a nil transaction means
// that a txn will be automatically created.
// that a txn will be automatically created. The useReadLock parameter will
// have the update acquire an exclusive lock on the job row when reading. This
// can help eliminate restarts in the face of concurrent updates at the cost of
// locking the row from readers. Most updates of a job do not expect contention
// and may do extra work and thus should not do locking. Cases where the job
// is used to coordinate resources from multiple nodes may benefit from locking.
func (r *Registry) UpdateJobWithTxn(
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, updateFunc UpdateFn,
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, useReadLock bool, updateFunc UpdateFn,
) error {
j := &Job{
id: jobID,
registry: r,
}
return j.Update(ctx, txn, updateFunc)
return j.update(ctx, txn, useReadLock, updateFunc)
}

var maxAdoptionsPerLoop = envutil.EnvOrDefaultInt(`COCKROACH_JOB_ADOPTIONS_PER_PERIOD`, 10)
Expand Down
36 changes: 29 additions & 7 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,21 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU
// Note that there are various convenience wrappers (like FractionProgressed)
// defined in jobs.go.
func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error {
const useReadLock = false
return j.update(ctx, txn, useReadLock, updateFn)
}

func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateFn UpdateFn) error {
var payload *jobspb.Payload
var progress *jobspb.Progress

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
stmt := "SELECT status, payload, progress FROM system.jobs WHERE id = $1 FOR UPDATE"
if j.sessionID != "" {
stmt = "SELECT status, payload, progress, claim_session_id FROM system." +
"jobs WHERE id = $1 FOR UPDATE"
}
var err error
var row tree.Datums
row, err = j.registry.ex.QueryRowEx(
ctx, "log-job", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
stmt, j.ID(),
ctx, "log-job", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
getSelectStmtForJobUpdate(j.sessionID != "", useReadLock), j.ID(),
)
if err != nil {
return err
Expand Down Expand Up @@ -253,3 +255,23 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error
}
return nil
}

// getSelectStmtForJobUpdate constructs the select statement used in Job.update.
func getSelectStmtForJobUpdate(hasSessionID, useReadLock bool) string {
const (
selectWithoutSession = `SELECT status, payload, progress`
selectWithSession = selectWithoutSession + `, claim_session_id`
from = ` FROM system.jobs WHERE id = $1`
fromForUpdate = from + ` FOR UPDATE`
)
if hasSessionID {
if useReadLock {
return selectWithSession + fromForUpdate
}
return selectWithSession + from
}
if useReadLock {
return selectWithoutSession + fromForUpdate
}
return selectWithoutSession + from
}
3 changes: 2 additions & 1 deletion pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ func (j *SeqChunkProvider) RequestChunk(
ju.UpdateProgress(progress)
return nil
}
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, resolveChunkFunc)
const useReadLock = true
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, useReadLock, resolveChunkFunc)
if err != nil {
return err
}
Expand Down

0 comments on commit 539496d

Please sign in to comment.