diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 926a8e853af1..51cc97bcc332 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -573,11 +573,11 @@ func (e *confluentAvroEncoder) register( type nativeEncoder struct{} func (e *nativeEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) { - panic("EncodeKey should not be called on nativeEncoder") + return nil, errors.New("EncodeKey unexpectedly called on nativeEncoder") } func (e *nativeEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) { - panic("EncodeValue should not be called on nativeEncoder") + return nil, errors.New("EncodeValue unexpectedly called on nativeEncoder") } func (e *nativeEncoder) EncodeResolvedTimestamp( diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 7769ad47aa9c..624a951da7e8 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -552,15 +552,20 @@ func (r *Registry) LoadJobWithTxn( // UpdateJobWithTxn calls the Update method on an existing job with jobID, using // a transaction passed in the txn argument. Passing a nil transaction means -// that a txn will be automatically created. +// that a txn will be automatically created. The useReadLock parameter will +// have the update acquire an exclusive lock on the job row when reading. This +// can help eliminate restarts in the face of concurrent updates at the cost of +// locking the row from readers. Most updates of a job do not expect contention +// and may do extra work and thus should not do locking. Cases where the job +// is used to coordinate resources from multiple nodes may benefit from locking. func (r *Registry) UpdateJobWithTxn( - ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, updateFunc UpdateFn, + ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, useReadLock bool, updateFunc UpdateFn, ) error { j := &Job{ id: jobID, registry: r, } - return j.Update(ctx, txn, updateFunc) + return j.update(ctx, txn, useReadLock, updateFunc) } var maxAdoptionsPerLoop = envutil.EnvOrDefaultInt(`COCKROACH_JOB_ADOPTIONS_PER_PERIOD`, 10) diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index d0c45045c27c..cb2c24d0be94 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -116,19 +116,21 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU // Note that there are various convenience wrappers (like FractionProgressed) // defined in jobs.go. func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error { + const useReadLock = false + return j.update(ctx, txn, useReadLock, updateFn) +} + +func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateFn UpdateFn) error { var payload *jobspb.Payload var progress *jobspb.Progress + if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error { - stmt := "SELECT status, payload, progress FROM system.jobs WHERE id = $1 FOR UPDATE" - if j.sessionID != "" { - stmt = "SELECT status, payload, progress, claim_session_id FROM system." + - "jobs WHERE id = $1 FOR UPDATE" - } var err error var row tree.Datums row, err = j.registry.ex.QueryRowEx( - ctx, "log-job", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, - stmt, j.ID(), + ctx, "log-job", txn, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + getSelectStmtForJobUpdate(j.sessionID != "", useReadLock), j.ID(), ) if err != nil { return err @@ -253,3 +255,23 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error } return nil } + +// getSelectStmtForJobUpdate constructs the select statement used in Job.update. +func getSelectStmtForJobUpdate(hasSessionID, useReadLock bool) string { + const ( + selectWithoutSession = `SELECT status, payload, progress` + selectWithSession = selectWithoutSession + `, claim_session_id` + from = ` FROM system.jobs WHERE id = $1` + fromForUpdate = from + ` FOR UPDATE` + ) + if hasSessionID { + if useReadLock { + return selectWithSession + fromForUpdate + } + return selectWithSession + from + } + if useReadLock { + return selectWithoutSession + fromForUpdate + } + return selectWithoutSession + from +} diff --git a/pkg/sql/row/expr_walker.go b/pkg/sql/row/expr_walker.go index 2dfb60dd5721..9a81464e119e 100644 --- a/pkg/sql/row/expr_walker.go +++ b/pkg/sql/row/expr_walker.go @@ -343,7 +343,8 @@ func (j *SeqChunkProvider) RequestChunk( ju.UpdateProgress(progress) return nil } - err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, resolveChunkFunc) + const useReadLock = true + err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, useReadLock, resolveChunkFunc) if err != nil { return err }