diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 8379f19b1e22..f52014f88097 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -219,6 +219,7 @@ go_test( "//pkg/sql/types", "//pkg/storage", "//pkg/testutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index fce7ff9a599f..5b7f9a2600b0 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -420,26 +420,57 @@ func (r *Registry) runJob( log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } - r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) + // NB: After this point, the job may no longer have the claim + // and further updates to the job record from this node may + // fail. + r.maybeClearLease(job, err) + r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) if r.knobs.AfterJobStateMachine != nil { r.knobs.AfterJobStateMachine() } return err } +const clearClaimQuery = ` + UPDATE system.jobs + SET claim_session_id = NULL, claim_instance_id = NULL + WHERE id = $1 + AND claim_session_id = $2 + AND claim_instance_id = $3 + AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')` + +// maybeClearLease clears the claim on the given job, provided that +// the current lease matches our liveness Session. +func (r *Registry) maybeClearLease(job *Job, jobErr error) { + if jobErr == nil { + return + } + + // We use the serverCtx here rather than the context from the + // caller since the caller's context may have been canceled. + r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { + n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID()) + if err != nil { + log.Warningf(ctx, "could not clear job claim: %s", err.Error()) + return + } + log.VEventf(ctx, 2, "cleared leases for %d jobs", n) + }) +} + const pauseAndCancelUpdate = ` UPDATE system.jobs - SET status = + SET status = CASE WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `' WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `' ELSE status END, num_runs = 0, - last_run = NULL, - claim_session_id = NULL, - claim_instance_id = NULL + last_run = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 9d4ea4cc6e0a..6d9089648d27 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1204,10 +1205,11 @@ func TestJobLifecycle(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -1462,6 +1464,10 @@ func TestJobLifecycle(t *testing.T) { t.Fatal(err) } + // Wait for job to be adopted so that we have the + // lease and can move to succeeded. + resumeSignaler.WaitForResumeStarted() + // PauseRequested fails after job is successful. if err := job.Succeeded(ctx); err != nil { t.Fatal(err) @@ -3161,6 +3167,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { require.Equal(t, expectedFiles, filesInZip) } +type resumeStartedSignaler struct { + syncutil.Mutex + cond *sync.Cond + isStarted bool +} + +func newResumeStartedSignaler() *resumeStartedSignaler { + ret := &resumeStartedSignaler{} + ret.cond = sync.NewCond(&ret.Mutex) + return ret + +} + +func (r *resumeStartedSignaler) SignalResumeStarted() { + r.Lock() + r.isStarted = true + r.cond.Signal() + r.Unlock() +} + +func (r *resumeStartedSignaler) WaitForResumeStarted() { + r.Lock() + for !r.isStarted { + r.cond.Wait() + } + r.isStarted = false + r.Unlock() +} + // TestPauseReason tests pausing a job with a user specified reason. func TestPauseReason(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3177,10 +3212,11 @@ func TestPauseReason(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -3212,9 +3248,16 @@ func TestPauseReason(t *testing.T) { return n } mustNotHaveClaim := func() { - require.Equal(t, 0, countRowsWithClaimInfo()) + t.Helper() + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 0 { + return nil + } + return errors.New("still waiting for claim to clear") + }) } mustHaveClaim := func() { + t.Helper() testutils.SucceedsSoon(t, func() error { if countRowsWithClaimInfo() == 1 { return nil @@ -3227,6 +3270,7 @@ func TestPauseReason(t *testing.T) { q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) mustHaveClaim() + resumeSignaler.WaitForResumeStarted() getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) { var payloadBytes []byte @@ -3260,6 +3304,7 @@ func TestPauseReason(t *testing.T) { checkStatusAndPauseReason(t, jobID, "running", "for testing") mustHaveClaim() + resumeSignaler.WaitForResumeStarted() } { // Pause the job again with a different reason. Verify that the job is paused with the reason. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index a0f1fe2e72ee..795fbb828a37 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1522,7 +1522,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j return } if updateErr != nil { - log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err) + log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr) } } diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index c67a94253132..1fb3e56ad6fb 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -82,6 +82,19 @@ func waitForJobToHaveStatus( } } +func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + testutils.SucceedsWithin(t, func() error { + var sessionID []byte + var instanceID gosql.NullInt64 + db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID) + if sessionID == nil && !instanceID.Valid { + return nil + } + return errors.Newf("job %d still has claim information") + }, 2*time.Minute) +} + // RunJob runs the provided job control statement, initializing, notifying and // closing the chan at the passed pointer (see below for why) and returning the // jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard