Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove backcompat support for non-atomic job registration #16305

Merged
merged 2 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16305.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:breaking-change
api: job register and register requests from API clients older than version 0.12.1 will not longer emit an evaluation
```
30 changes: 2 additions & 28 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

if existingJob == nil || specChanged {

// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
// 0.12.1 introduced atomic eval job registration
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
if eval != nil {
args.Eval = eval
submittedEval = true
}
Expand Down Expand Up @@ -887,10 +885,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
reply.EvalID = eval.ID
}

// COMPAT(1.1.0): remove conditional and always set args.Eval
if ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
}
args.Eval = eval

// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
Expand All @@ -904,27 +899,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
reply.EvalCreateIndex = index
reply.Index = index

// COMPAT(1.1.0) - Remove entire conditional block
// 0.12.1 introduced atomic job deregistration eval
if eval != nil && args.Eval == nil {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}

reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

err = j.multiregionStop(job, args, reply)
if err != nil {
return err
Expand Down
253 changes: 5 additions & 248 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1998,8 +1998,8 @@ func TestJobEndpoint_Register_SemverConstraint(t *testing.T) {
})
}

// TestJobEndpoint_Register_EvalCreation_Modern asserts that job register creates an eval
// atomically with the registration
// TestJobEndpoint_Register_EvalCreation asserts that job register creates an
// eval atomically with the registration
func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) {
ci.Parallel(t)

Expand Down Expand Up @@ -2119,150 +2119,6 @@ func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) {
})
}

// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job register creates an eval
// atomically with the registration, but handle legacy clients by adding a new eval update
func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()

s2, cleanupS2 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.NumSchedulers = 0 // Prevent automatic dequeue

// simulate presense of a server that doesn't handle
// new registration eval
c.Build = "0.12.0"
})
defer cleanupS2()

TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)

// keep s1 as the leader
if leader, _ := s1.getLeader(); !leader {
s1, s2 = s2, s1
}

codec := rpcClient(t, s1)

// Create the register request
t.Run("job registration always create evals", func(t *testing.T) {
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

//// initial registration should create the job and a new eval
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
require.NoError(t, err)
require.NotZero(t, resp.Index)
require.NotEmpty(t, resp.EvalID)

// Check for the job in the FSM
state := s1.fsm.State()
out, err := state.JobByID(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, resp.JobModifyIndex, out.CreateIndex)

// Lookup the evaluation
eval, err := state.EvalByID(nil, resp.EvalID)
require.NoError(t, err)
require.NotNil(t, eval)
require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex)

raftEval := evalUpdateFromRaft(t, s1, eval.ID)
require.Equal(t, eval, raftEval)

//// re-registration should create a new eval, but leave the job untouched
var resp2 structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2)
require.NoError(t, err)
require.NotZero(t, resp2.Index)
require.NotEmpty(t, resp2.EvalID)
require.NotEqual(t, resp.EvalID, resp2.EvalID)

// Check for the job in the FSM
state = s1.fsm.State()
out, err = state.JobByID(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, resp2.JobModifyIndex, out.CreateIndex)
require.Equal(t, out.CreateIndex, out.JobModifyIndex)

// Lookup the evaluation
eval, err = state.EvalByID(nil, resp2.EvalID)
require.NoError(t, err)
require.NotNil(t, eval)
require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex)

// this raft eval is the one found above
raftEval = evalUpdateFromRaft(t, s1, eval.ID)
require.Equal(t, eval, raftEval)

//// an update should update the job and create a new eval
req.Job.TaskGroups[0].Name += "a"
var resp3 structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3)
require.NoError(t, err)
require.NotZero(t, resp3.Index)
require.NotEmpty(t, resp3.EvalID)
require.NotEqual(t, resp.EvalID, resp3.EvalID)

// Check for the job in the FSM
state = s1.fsm.State()
out, err = state.JobByID(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex)

// Lookup the evaluation
eval, err = state.EvalByID(nil, resp3.EvalID)
require.NoError(t, err)
require.NotNil(t, eval)
require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex)

raftEval = evalUpdateFromRaft(t, s1, eval.ID)
require.Equal(t, eval, raftEval)
})

// Registering a parameterized job shouldn't create an eval
t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) {
job := mock.PeriodicJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
require.NoError(t, err)
require.NotZero(t, resp.Index)
require.Empty(t, resp.EvalID)

// Check for the job in the FSM
state := s1.fsm.State()
out, err := state.JobByID(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, resp.JobModifyIndex, out.CreateIndex)
})
}

func TestJobEndpoint_Register_ValidateMemoryMax(t *testing.T) {
ci.Parallel(t)

Expand Down Expand Up @@ -3786,9 +3642,9 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) {
}
}

// TestJobEndpoint_Deregister_EvalCreation_Modern asserts that job deregister creates an eval
// atomically with the registration
func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) {
// TestJobEndpoint_Deregister_EvalCreation asserts that job deregister creates
// an eval atomically with the registration
func TestJobEndpoint_Deregister_EvalCreation(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
Expand Down Expand Up @@ -3866,105 +3722,6 @@ func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) {
})
}

// TestJobEndpoint_Deregister_EvalCreation_Legacy asserts that job deregister
// creates an eval atomically with the registration, but handle legacy clients
// by adding a new eval update
func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()

s2, cleanupS2 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.NumSchedulers = 0 // Prevent automatic dequeue

// simulate presense of a server that doesn't handle
// new registration eval
c.Build = "0.12.0"
})
defer cleanupS2()

TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)

// keep s1 as the leader
if leader, _ := s1.getLeader(); !leader {
s1, s2 = s2, s1
}

codec := rpcClient(t, s1)

// Create the register request
t.Run("job registration always create evals", func(t *testing.T) {
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
require.NoError(t, err)

dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp2 structs.JobDeregisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2)
require.NoError(t, err)
require.NotEmpty(t, resp2.EvalID)

state := s1.fsm.State()
eval, err := state.EvalByID(nil, resp2.EvalID)
require.Nil(t, err)
require.NotNil(t, eval)
require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex)

raftEval := evalUpdateFromRaft(t, s1, eval.ID)
require.Equal(t, eval, raftEval)
})

// Registering a parameterized job shouldn't create an eval
t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) {
job := mock.PeriodicJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
require.NoError(t, err)
require.NotZero(t, resp.Index)

dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp2 structs.JobDeregisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2)
require.NoError(t, err)
require.Empty(t, resp2.EvalID)
})
}

func TestJobEndpoint_Deregister_NoShutdownDelay(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
2 changes: 0 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0"))

var minClusterIDVersion = version.Must(version.NewVersion("0.10.4"))

var minJobRegisterAtomicEvalVersion = version.Must(version.NewVersion("0.12.1"))

var minOneTimeAuthenticationTokenVersion = version.Must(version.NewVersion("1.1.0"))

// minACLRoleVersion is the Nomad version at which the ACL role table was
Expand Down
21 changes: 0 additions & 21 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,6 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
eval.CreateIndex = index
eval.ModifyIndex = index

// COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minJobRegisterAtomicEvalVersion, false) {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}

// Commit this evaluation via Raft
// There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not, before Nomad 0.12.1
_, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
return nil, err
}

// Update its indexes.
eval.CreateIndex = evalIndex
eval.ModifyIndex = evalIndex
}

return eval, nil
}

Expand Down