Skip to content

Commit

Permalink
jobs,changefeedccl: only load adopted jobs
Browse files Browse the repository at this point in the history
The changeFrontier updates the job multiple times during its run. It
does this via a *job.Job object that it explicitly loads at startup
using a JobID that is passed to it via its distflow spec.

However, when loading the job in this way, the sessionID that was
associated with the initial adoption of the job is not set. This means
that this change frontier can continue updating the job without error
even long after the session from the original adoption is
expired. This can potentially lead to a single changefeed job running
twice.

This change addresses that by introducing a new method, LoadAdoptedJob
to the Registry, which consults the Registry's map of adopted job to
find the sessionID, and ensure that the sessionID is included in the
query to load the job.

Note that for this to be correct it _requires_ that the changeFrontier
is only ever scheduled on the sql instance that adopts the job.

This API should probably change further to make it much harder to
interact with jobs that haven't been adopted by your registry.
However, the goal of this change is to be backportable.

Release note (enterprise change): Fixes a bug that could have led to
duplicate instances of a single changefeed job running for prolonged
periods of time.
  • Loading branch information
stevendanna committed Sep 24, 2021
1 parent dcba505 commit 567cb04
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadJob(ctx, cf.spec.JobID)
job, err := cf.flowCtx.Cfg.JobRegistry.LoadAdoptedJob(ctx, cf.spec.JobID)
if err != nil {
cf.MoveToDraining(err)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func (b *changefeedResumer) resumeWithRetries(
}
// Re-load the job in order to update our progress object, which may have
// been updated by the changeFrontier processor since the flow started.
reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID)
reloadedJob, reloadErr := execCfg.JobRegistry.LoadAdoptedJob(ctx, jobID)
if reloadErr != nil {
if ctx.Err() != nil {
return ctx.Err()
Expand Down
30 changes: 25 additions & 5 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (j *Job) ID() jobspb.JobID {
return j.id
}

// SessionID returns the session ID of the job.
func (j *Job) SessionID() sqlliveness.SessionID {
return j.sessionID
}

// CreatedBy returns name/id of this job creator. This will be nil if this information
// was not set.
func (j *Job) CreatedBy() *CreatedByInfo {
Expand Down Expand Up @@ -741,11 +746,15 @@ func (j *Job) runInTxn(

// JobNotFoundError is returned from load when the job does not exist.
type JobNotFoundError struct {
jobID jobspb.JobID
jobID jobspb.JobID
sessionID sqlliveness.SessionID
}

// Error makes JobNotFoundError an error.
func (e *JobNotFoundError) Error() string {
if e.sessionID != "" {
return fmt.Sprintf("job with ID %d does not exist with claim session id %q", e.jobID, e.sessionID.String())
}
return fmt.Sprintf("job with ID %d does not exist", e.jobID)
}

Expand All @@ -760,10 +769,21 @@ func (j *Job) load(ctx context.Context, txn *kv.Txn) error {
var createdBy *CreatedByInfo

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
const stmt = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1"
row, err := j.registry.ex.QueryRowEx(
ctx, "load-job-query", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
stmt, j.ID())
const (
queryNoSessionID = "SELECT payload, progress, created_by_type, created_by_id FROM system.jobs WHERE id = $1"
queryWithSessionID = queryNoSessionID + " AND claim_session_id = $2"
)
sess := sessiondata.InternalExecutorOverride{User: security.RootUserName()}

var err error
var row tree.Datums
if j.sessionID == "" {
row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess,
queryNoSessionID, j.ID())
} else {
row, err = j.registry.ex.QueryRowEx(ctx, "load-job-query", txn, sess,
queryWithSessionID, j.ID(), j.SessionID().UnsafeBytes())
}
if err != nil {
return err
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,29 @@ func (r *Registry) LoadJob(ctx context.Context, jobID jobspb.JobID) (*Job, error
return r.LoadJobWithTxn(ctx, jobID, nil)
}

// LoadAdoptedJob loads an existing job with the given jobID from the
// system.jobs table. The job must have already been adopted by this
// Registry.
func (r *Registry) LoadAdoptedJob(ctx context.Context, jobID jobspb.JobID) (*Job, error) {
r.mu.Lock()
defer r.mu.Unlock()

adoptedJob, ok := r.mu.adoptedJobs[jobID]
if !ok {
return nil, &JobNotFoundError{jobID: jobID}
}

j := &Job{
id: jobID,
sessionID: adoptedJob.sid,
registry: r,
}
if err := j.load(ctx, nil); err != nil {
return nil, err
}
return j, nil
}

// LoadJobWithTxn does the same as above, but using the transaction passed in
// the txn argument. Passing a nil transaction is equivalent to calling LoadJob
// in that a transaction will be automatically created.
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -178,6 +179,8 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF
"job %d: with status '%s': expected session '%s' but found '%s'",
j.ID(), statusString, j.sessionID, storedSession)
}
} else {
log.VInfof(ctx, 1, "job %s: update called with no session ID", j.sessionID.String())
}
if payload, err = UnmarshalPayload(row[1]); err != nil {
return err
Expand Down

0 comments on commit 567cb04

Please sign in to comment.