Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add PreserveCounts to Job.Register #8168

Merged
merged 6 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ FEATURES:

IMPROVEMENTS:

* core: support for persisting previous task group counts when updating a job [[GH-8168](https://github.com/hashicorp/nomad/issues/8168)]
* api: Persist previous count with scaling events [[GH-8167](https://github.com/hashicorp/nomad/issues/8167)]
* build: Updated to Go 1.14.4 [[GH-8172](https://github.com/hashicorp/nomad/issues/9172)]

Expand Down
23 changes: 8 additions & 15 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type RegisterOptions struct {
EnforceIndex bool
ModifyIndex uint64
PolicyOverride bool
PreserveCounts bool
}

// Register is used to register a new job. It returns the ID
Expand All @@ -105,17 +106,16 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*
// of the evaluation, along with any errors encountered.
func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
// Format the request
req := &RegisterJobRequest{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobRegisterRequest has WriteRequest, but we don't use it. Otherwise, has the same shape as RegisterJobRequest

req := &JobRegisterRequest{
Job: job,
}
if opts != nil {
if opts.EnforceIndex {
req.EnforceIndex = true
req.JobModifyIndex = opts.ModifyIndex
}
if opts.PolicyOverride {
req.PolicyOverride = true
}
req.PolicyOverride = opts.PolicyOverride
req.PreserveCounts = opts.PreserveCounts
}

var resp JobRegisterResponse
Expand Down Expand Up @@ -1035,25 +1035,18 @@ type JobRevertRequest struct {
WriteRequest
}

// JobUpdateRequest is used to update a job
// JobRegisterRequest is used to update a job
type JobRegisterRequest struct {
Job *Job
// If EnforceIndex is set then the job will only be registered if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero, the
// register only occurs if the job is new.
EnforceIndex bool
JobModifyIndex uint64
PolicyOverride bool

WriteRequest
}

// RegisterJobRequest is used to serialize a job registration
type RegisterJobRequest struct {
Job *Job
EnforceIndex bool `json:",omitempty"`
JobModifyIndex uint64 `json:",omitempty"`
PolicyOverride bool `json:",omitempty"`
PreserveCounts bool `json:",omitempty"`

WriteRequest
}

// JobRegisterResponse is used to respond to a job registration
Expand Down
142 changes: 142 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,148 @@ func TestJobs_Register(t *testing.T) {
}
}

func TestJobs_Register_PreserveCounts(t *testing.T) {
t.Parallel()
require := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Listing jobs before registering returns nothing
resp, _, err := jobs.List(nil)
require.Nil(err)
require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp))

// Create a job
task := NewTask("task", "exec").
SetConfig("command", "/bin/sleep").
Require(&Resources{
CPU: intToPtr(100),
MemoryMB: intToPtr(256),
}).
SetLogConfig(&LogConfig{
MaxFiles: intToPtr(1),
MaxFileSizeMB: intToPtr(2),
})

group1 := NewTaskGroup("group1", 1).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
group2 := NewTaskGroup("group2", 2).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})

job := NewBatchJob("job", "redis", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(group1).
AddTaskGroup(group2)

// Create a job and register it
resp2, wm, err := jobs.Register(job, nil)
require.Nil(err)
require.NotNil(resp2)
require.NotEmpty(resp2.EvalID)
assertWriteMeta(t, wm)

// Update the job, new groups to test PreserveCounts
group1.Count = nil
group2.Count = intToPtr(0)
group3 := NewTaskGroup("group3", 3).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
job.AddTaskGroup(group3)

// Update the job, with PreserveCounts = true
_, _, err = jobs.RegisterOpts(job, &RegisterOptions{
PreserveCounts: true,
}, nil)
require.NoError(err)

// Query the job scale status
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Equal(1, status.TaskGroups["group1"].Desired) // present and nil => preserved
require.Equal(2, status.TaskGroups["group2"].Desired) // present and specified => preserved
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specific in job spec
}

func TestJobs_Register_NoPreserveCounts(t *testing.T) {
t.Parallel()
require := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Listing jobs before registering returns nothing
resp, _, err := jobs.List(nil)
require.Nil(err)
require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp))

// Create a job
task := NewTask("task", "exec").
SetConfig("command", "/bin/sleep").
Require(&Resources{
CPU: intToPtr(100),
MemoryMB: intToPtr(256),
}).
SetLogConfig(&LogConfig{
MaxFiles: intToPtr(1),
MaxFileSizeMB: intToPtr(2),
})

group1 := NewTaskGroup("group1", 1).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
group2 := NewTaskGroup("group2", 2).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})

job := NewBatchJob("job", "redis", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(group1).
AddTaskGroup(group2)

// Create a job and register it
resp2, wm, err := jobs.Register(job, nil)
require.Nil(err)
require.NotNil(resp2)
require.NotEmpty(resp2.EvalID)
assertWriteMeta(t, wm)

// Update the job, new groups to test PreserveCounts
group1.Count = intToPtr(0)
group2.Count = nil
group3 := NewTaskGroup("group3", 3).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
job.AddTaskGroup(group3)

// Update the job, with PreserveCounts = default [false]
_, _, err = jobs.Register(job, nil)
require.NoError(err)

// Query the job scale status
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Equal(0, status.TaskGroups["group1"].Desired) // present => as specified
require.Equal(1, status.TaskGroups["group2"].Desired) // nil => default (1)
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified
}

func TestJobs_Validate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
EnforceIndex: args.EnforceIndex,
JobModifyIndex: args.JobModifyIndex,
PolicyOverride: args.PolicyOverride,
PreserveCounts: args.PreserveCounts,
WriteRequest: structs.WriteRequest{
Region: sJob.Region,
AuthToken: args.WriteRequest.SecretID,
Expand Down
6 changes: 5 additions & 1 deletion command/job_inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ func (c *JobInspectCommand) Run(args []string) int {
}

// Print the contents of the job
req := api.RegisterJobRequest{Job: job}
req := struct {
Job *api.Job
}{
Job: job,
}
Comment on lines +165 to +169
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a class for deserializing a Job: -embedded job spec, without all of the other job registration options.

f, err := DataFormat("json", "")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error getting formatter: %s", err))
Expand Down
18 changes: 13 additions & 5 deletions command/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ Run Options:
-policy-override
Sets the flag to force override any soft mandatory Sentinel policies.

-preserve-counts
If set, the existing task group counts will be preserved when updating a job.

-consul-token
If set, the passed Consul token is stored in the job before sending to the
Nomad servers. This allows passing the Consul token without storing it in
Expand Down Expand Up @@ -118,6 +121,7 @@ func (c *JobRunCommand) AutocompleteFlags() complete.Flags {
"-vault-token": complete.PredictAnything,
"-output": complete.PredictNothing,
"-policy-override": complete.PredictNothing,
"-preserve-counts": complete.PredictNothing,
})
}

Expand All @@ -128,7 +132,7 @@ func (c *JobRunCommand) AutocompleteArgs() complete.Predictor {
func (c *JobRunCommand) Name() string { return "job run" }

func (c *JobRunCommand) Run(args []string) int {
var detach, verbose, output, override bool
var detach, verbose, output, override, preserveCounts bool
var checkIndexStr, consulToken, vaultToken string

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
Expand All @@ -137,6 +141,7 @@ func (c *JobRunCommand) Run(args []string) int {
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&output, "output", false, "")
flags.BoolVar(&override, "policy-override", false, "")
flags.BoolVar(&preserveCounts, "preserve-counts", false, "")
flags.StringVar(&checkIndexStr, "check-index", "", "")
flags.StringVar(&consulToken, "consul-token", "", "")
flags.StringVar(&vaultToken, "vault-token", "", "")
Expand Down Expand Up @@ -208,7 +213,11 @@ func (c *JobRunCommand) Run(args []string) int {
}

if output {
req := api.RegisterJobRequest{Job: job}
req := struct {
Job *api.Job
}{
Job: job,
}
Comment on lines +216 to +220
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a class for deserializing a Job: -embedded job spec, without all of the other job registration options.

buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
Expand All @@ -232,9 +241,8 @@ func (c *JobRunCommand) Run(args []string) int {
opts.EnforceIndex = true
opts.ModifyIndex = checkIndex
}
if override {
opts.PolicyOverride = true
}
opts.PolicyOverride = override
opts.PreserveCounts = preserveCounts

// Submit the job
resp, _, err := client.Jobs().RegisterOpts(job, opts, nil)
Expand Down
13 changes: 13 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Clear the Consul token
args.Job.ConsulToken = ""

// Preserve the existing task group counts, if so requested
if existingJob != nil && args.PreserveCounts {
prevCounts := make(map[string]int)
for _, tg := range existingJob.TaskGroups {
prevCounts[tg.Name] = tg.Count
}
for _, tg := range args.Job.TaskGroups {
if count, ok := prevCounts[tg.Name]; ok {
tg.Count = count
}
}
}

// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// Set the submit time
Expand Down
61 changes: 61 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,67 @@ func TestJobEndpoint_Register(t *testing.T) {
}
}

func TestJobEndpoint_Register_PreserveCounts(t *testing.T) {
t.Parallel()
require := require.New(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
job := mock.Job()
job.TaskGroups[0].Name = "group1"
job.TaskGroups[0].Count = 10
job.Canonicalize()

// Register the job
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}, &structs.JobRegisterResponse{}))

// Check the job in the FSM state
state := s1.fsm.State()
out, err := state.JobByID(nil, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(out)
require.Equal(10, out.TaskGroups[0].Count)

// New version:
// new "group2" with 2 instances
// "group1" goes from 10 -> 0 in the spec
job = job.Copy()
job.TaskGroups[0].Count = 0 // 10 -> 0 in the job spec
job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy())
job.TaskGroups[1].Name = "group2"
job.TaskGroups[1].Count = 2

// Perform the update
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
Job: job,
PreserveCounts: true,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}, &structs.JobRegisterResponse{}))

// Check the job in the FSM state
out, err = state.JobByID(nil, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(out)
require.Equal(10, out.TaskGroups[0].Count) // should not change
require.Equal(2, out.TaskGroups[1].Count) // should be as in job spec
}


func TestJobEndpoint_Register_Connect(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,11 @@ type JobRegisterRequest struct {
EnforceIndex bool
JobModifyIndex uint64

// PreserveCounts indicates that during job update, existing task group
// counts should be preserved, over those specified in the new job spec
// PreserveCounts is ignored for newly created jobs.
PreserveCounts bool

// PolicyOverride is set when the user is attempting to override any policies
PolicyOverride bool

Expand Down
Loading