From 2ba983d1731548db3f834c3995f5568f1443f4d0 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 9 Nov 2022 00:17:41 +0000 Subject: [PATCH 1/2] jobs: clear job claim after execution Since #89014 the job system reset a job's claim when transitioning it from pause-requested to paused and from cancel-requested to reverting. The job system signals these transitions to the running Resumer by cancelling the job's context and does not wait for the resumer to exit. Once the claim is clear, another node can adopt the job and start running it's OnFailOrCancel callback. As a result, clearing the context makes it more likely that OnFailOrCancel executions will overlap with Resume executions. In general, Jobs need to assume that Resume may still be running while OnFailOrCancel is called. But, making it more likely isn't in our interest. Here, we only clear the lease when we exit the job state machine. This makes it much more likely that OnFailOrCancel doesn't start until Resume has returned. Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/alter_changefeed_test.go | 3 +- pkg/jobs/adopt.go | 41 +++++++++++++-- pkg/jobs/jobs_test.go | 51 +++++++++++++++++-- pkg/jobs/registry.go | 2 +- pkg/testutils/jobutils/jobs_verification.go | 13 +++++ 6 files changed, 101 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 5cccc7d139bf..ce5b5782aeb6 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -250,6 +250,7 @@ go_test( "//pkg/sql/types", "//pkg/storage", "//pkg/testutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 39aefc5108fb..759df1a35439 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) { feed := testFeed.(cdctest.EnterpriseTestFeed) require.NoError(t, feed.Pause()) - + jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID()) sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID())) counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 69a7c6ed399e..24557ddcada4 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -417,26 +417,57 @@ func (r *Registry) runJob( log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } - r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) + // NB: After this point, the job may no longer have the claim + // and further updates to the job record from this node may + // fail. + r.maybeClearLease(job, err) + r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) if r.knobs.AfterJobStateMachine != nil { r.knobs.AfterJobStateMachine() } return err } +const clearClaimQuery = ` + UPDATE system.jobs + SET claim_session_id = NULL, claim_instance_id = NULL + WHERE id = $1 + AND claim_session_id = $2 + AND claim_instance_id = $3 + AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')` + +// maybeClearLease clears the claim on the given job, provided that +// the current lease matches our liveness Session. +func (r *Registry) maybeClearLease(job *Job, jobErr error) { + if jobErr == nil { + return + } + + // We use the serverCtx here rather than the context from the + // caller since the caller's context may have been canceled. + r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { + n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID()) + if err != nil { + log.Warningf(ctx, "could not clear job claim: %s", err.Error()) + return + } + log.VEventf(ctx, 2, "cleared leases for %d jobs", n) + }) +} + const pauseAndCancelUpdate = ` UPDATE system.jobs - SET status = + SET status = CASE WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `' WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `' ELSE status END, num_runs = 0, - last_run = NULL, - claim_session_id = NULL, - claim_instance_id = NULL + last_run = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 243cab6fd063..a312245fd419 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -21,6 +21,7 @@ import ( "runtime/pprof" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1211,10 +1212,11 @@ func TestJobLifecycle(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -1469,6 +1471,10 @@ func TestJobLifecycle(t *testing.T) { t.Fatal(err) } + // Wait for job to be adopted so that we have the + // lease and can move to succeeded. + resumeSignaler.WaitForResumeStarted() + // PauseRequested fails after job is successful. if err := job.Succeeded(ctx); err != nil { t.Fatal(err) @@ -3109,6 +3115,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { require.Equal(t, expectedFiles, filesInZip) } +type resumeStartedSignaler struct { + syncutil.Mutex + cond *sync.Cond + isStarted bool +} + +func newResumeStartedSignaler() *resumeStartedSignaler { + ret := &resumeStartedSignaler{} + ret.cond = sync.NewCond(&ret.Mutex) + return ret + +} + +func (r *resumeStartedSignaler) SignalResumeStarted() { + r.Lock() + r.isStarted = true + r.cond.Signal() + r.Unlock() +} + +func (r *resumeStartedSignaler) WaitForResumeStarted() { + r.Lock() + for !r.isStarted { + r.cond.Wait() + } + r.isStarted = false + r.Unlock() +} + // TestPauseReason tests pausing a job with a user specified reason. func TestPauseReason(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3125,10 +3160,11 @@ func TestPauseReason(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -3160,9 +3196,16 @@ func TestPauseReason(t *testing.T) { return n } mustNotHaveClaim := func() { - require.Equal(t, 0, countRowsWithClaimInfo()) + t.Helper() + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 0 { + return nil + } + return errors.New("still waiting for claim to clear") + }) } mustHaveClaim := func() { + t.Helper() testutils.SucceedsSoon(t, func() error { if countRowsWithClaimInfo() == 1 { return nil @@ -3175,6 +3218,7 @@ func TestPauseReason(t *testing.T) { q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) mustHaveClaim() + resumeSignaler.WaitForResumeStarted() getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) { var payloadBytes []byte @@ -3208,6 +3252,7 @@ func TestPauseReason(t *testing.T) { checkStatusAndPauseReason(t, jobID, "running", "for testing") mustHaveClaim() + resumeSignaler.WaitForResumeStarted() } { // Pause the job again with a different reason. Verify that the job is paused with the reason. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index af16ea4a9470..ce7d1a8e3e41 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1576,7 +1576,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j return } if updateErr != nil { - log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err) + log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr) } } diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 1872596d41c7..be8febec0f5b 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -89,6 +89,19 @@ func waitForJobToHaveStatus( }, 2*time.Minute) } +func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + testutils.SucceedsWithin(t, func() error { + var sessionID []byte + var instanceID gosql.NullInt64 + db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID) + if sessionID == nil && !instanceID.Valid { + return nil + } + return errors.Newf("job %d still has claim information") + }, 2*time.Minute) +} + // RunJob runs the provided job control statement, initializing, notifying and // closing the chan at the passed pointer (see below for why) and returning the // jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard From deb0896bdd7e43ab457f6edb5a2834067ee87cfa Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 10 Nov 2022 12:09:51 +0000 Subject: [PATCH 2/2] jobs: add log scopes to tests Release note: None --- pkg/jobs/delegate_control_test.go | 7 +++++++ pkg/jobs/lease_test.go | 2 ++ pkg/jobs/registry_test.go | 2 ++ 3 files changed, 11 insertions(+) diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 0272f38acec2..e6f2ceb045b5 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -24,12 +24,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestScheduleControl(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -139,6 +142,8 @@ func TestScheduleControl(t *testing.T) { func TestJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, nil) defer cleanup() @@ -247,6 +252,7 @@ func TestJobsControlForSchedules(t *testing.T) { // jobs prior to executing the control command. func TestFilterJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { @@ -327,6 +333,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) { func TestJobControlByType(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { diff --git a/pkg/jobs/lease_test.go b/pkg/jobs/lease_test.go index ba9c2d027fcf..816882de5ee9 100644 --- a/pkg/jobs/lease_test.go +++ b/pkg/jobs/lease_test.go @@ -20,12 +20,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestJobsTableClaimFamily(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index e9f0c18c4a39..d8aad354bd0b 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -973,6 +973,7 @@ func TestRunWithoutLoop(t *testing.T) { func TestJobIdleness(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() intervalOverride := time.Millisecond @@ -1111,6 +1112,7 @@ func TestJobIdleness(t *testing.T) { // allow other job registries in the cluster to claim and run this job. func TestDisablingJobAdoptionClearsClaimSessionID(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) intervalOverride := time.Millisecond s, db, _ := serverutils.StartServer(t, base.TestServerArgs{