Skip to content

Commit

Permalink
jobs: Clear out claim info when pausing
Browse files Browse the repository at this point in the history
Clear out job claim information when job is paused.
Clearing out claim information is beneficial since it allows
operator to pause/resume job if they want to try to move
job coordinator to another node.

Release note: none
  • Loading branch information
Yevgeniy Miretskiy committed Sep 30, 2022
1 parent 3a9f5b8 commit b2a6b80
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ const pauseAndCancelUpdate = `
ELSE status
END,
num_runs = 0,
last_run = NULL
last_run = NULL,
claim_session_id = NULL,
claim_instance_id = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
Expand Down
25 changes: 25 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3138,9 +3138,31 @@ func TestPauseReason(t *testing.T) {
_, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */)
require.NoError(t, err)

countRowsWithClaimInfo := func() int {
t.Helper()
n := 0
tdb.QueryRow(t,
"SELECT count(*) FROM system.jobs "+
"WHERE id = $1 AND (claim_session_id IS NOT NULL OR claim_instance_id IS NOT NULL)",
jobID).Scan(&n)
return n
}
mustNotHaveClaim := func() {
require.Equal(t, 0, countRowsWithClaimInfo())
}
mustHaveClaim := func() {
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 1 {
return nil
}
return errors.New("still waiting for claim info")
})
}

// First wait until the job is running
q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID)
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})
mustHaveClaim()

getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) {
var payloadBytes []byte
Expand All @@ -3164,6 +3186,7 @@ func TestPauseReason(t *testing.T) {
require.NoError(t, registry.PauseRequested(ctx, nil, jobID, "for testing"))
tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}})
checkStatusAndPauseReason(t, jobID, "paused", "for testing")
mustNotHaveClaim()
}

{
Expand All @@ -3172,12 +3195,14 @@ func TestPauseReason(t *testing.T) {
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})

checkStatusAndPauseReason(t, jobID, "running", "for testing")
mustHaveClaim()
}
{
// Pause the job again with a different reason. Verify that the job is paused with the reason.
require.NoError(t, registry.PauseRequested(ctx, nil, jobID, "second time"))
tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}})
checkStatusAndPauseReason(t, jobID, "paused", "second time")
mustNotHaveClaim()
}
}

Expand Down

0 comments on commit b2a6b80

Please sign in to comment.