Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: clear job claim after execution #91563

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ go_test(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
feed := testFeed.(cdctest.EnterpriseTestFeed)

require.NoError(t, feed.Pause())

jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID())
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID()))

counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
Expand Down
41 changes: 36 additions & 5 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,26 +417,57 @@ func (r *Registry) runJob(
log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err)
}

r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
r.maybeRecordExecutionFailure(ctx, err, job)
// NB: After this point, the job may no longer have the claim
// and further updates to the job record from this node may
// fail.
r.maybeClearLease(job, err)
r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
if r.knobs.AfterJobStateMachine != nil {
r.knobs.AfterJobStateMachine()
}
return err
}

const clearClaimQuery = `
UPDATE system.jobs
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3
AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
return
}

// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
}
log.VEventf(ctx, 2, "cleared leases for %d jobs", n)
})
}

const pauseAndCancelUpdate = `
UPDATE system.jobs
SET status =
SET status =
CASE
WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `'
WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `'
ELSE status
END,
num_runs = 0,
last_run = NULL,
claim_session_id = NULL,
claim_instance_id = NULL
last_run = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestScheduleControl(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelper(t)
defer cleanup()

Expand Down Expand Up @@ -139,6 +142,8 @@ func TestScheduleControl(t *testing.T) {

func TestJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, nil)
defer cleanup()

Expand Down Expand Up @@ -247,6 +252,7 @@ func TestJobsControlForSchedules(t *testing.T) {
// jobs prior to executing the control command.
func TestFilterJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer ResetConstructors()()

argsFn := func(args *base.TestServerArgs) {
Expand Down Expand Up @@ -327,6 +333,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {

func TestJobControlByType(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer ResetConstructors()()

argsFn := func(args *base.TestServerArgs) {
Expand Down
51 changes: 48 additions & 3 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"runtime/pprof"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1211,10 +1212,11 @@ func TestJobLifecycle(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -1469,6 +1471,10 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}

// Wait for job to be adopted so that we have the
// lease and can move to succeeded.
resumeSignaler.WaitForResumeStarted()

// PauseRequested fails after job is successful.
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3109,6 +3115,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) {
require.Equal(t, expectedFiles, filesInZip)
}

type resumeStartedSignaler struct {
syncutil.Mutex
cond *sync.Cond
isStarted bool
}

func newResumeStartedSignaler() *resumeStartedSignaler {
ret := &resumeStartedSignaler{}
ret.cond = sync.NewCond(&ret.Mutex)
return ret

}

func (r *resumeStartedSignaler) SignalResumeStarted() {
r.Lock()
r.isStarted = true
r.cond.Signal()
r.Unlock()
}

func (r *resumeStartedSignaler) WaitForResumeStarted() {
r.Lock()
for !r.isStarted {
r.cond.Wait()
}
r.isStarted = false
r.Unlock()
}

// TestPauseReason tests pausing a job with a user specified reason.
func TestPauseReason(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -3125,10 +3160,11 @@ func TestPauseReason(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -3160,9 +3196,16 @@ func TestPauseReason(t *testing.T) {
return n
}
mustNotHaveClaim := func() {
require.Equal(t, 0, countRowsWithClaimInfo())
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 0 {
return nil
}
return errors.New("still waiting for claim to clear")
})
}
mustHaveClaim := func() {
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 1 {
return nil
Expand All @@ -3175,6 +3218,7 @@ func TestPauseReason(t *testing.T) {
q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID)
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()

getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) {
var payloadBytes []byte
Expand Down Expand Up @@ -3208,6 +3252,7 @@ func TestPauseReason(t *testing.T) {

checkStatusAndPauseReason(t, jobID, "running", "for testing")
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()
}
{
// Pause the job again with a different reason. Verify that the job is paused with the reason.
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestJobsTableClaimFamily(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j
return
}
if updateErr != nil {
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err)
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ func TestRunWithoutLoop(t *testing.T) {

func TestJobIdleness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
intervalOverride := time.Millisecond
Expand Down Expand Up @@ -1111,6 +1112,7 @@ func TestJobIdleness(t *testing.T) {
// allow other job registries in the cluster to claim and run this job.
func TestDisablingJobAdoptionClearsClaimSessionID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

intervalOverride := time.Millisecond
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Expand Down
13 changes: 13 additions & 0 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func waitForJobToHaveStatus(
}, 2*time.Minute)
}

func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) {
t.Helper()
testutils.SucceedsWithin(t, func() error {
var sessionID []byte
var instanceID gosql.NullInt64
db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID)
if sessionID == nil && !instanceID.Valid {
return nil
}
return errors.Newf("job %d still has claim information")
}, 2*time.Minute)
}

// RunJob runs the provided job control statement, initializing, notifying and
// closing the chan at the passed pointer (see below for why) and returning the
// jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard
Expand Down