diff --git a/jobspec/parse.go b/jobspec/parse.go index 7c98c22aae3..8a208f9864d 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -412,12 +412,13 @@ func parseConstraints(result *[]*api.Constraint, list *ast.ObjectList) error { // Check for invalid keys valid := []string{ "attribute", + "distinct_hosts", + "distinct_property", "operator", - "value", - "version", "regexp", - "distinct_hosts", "set_contains", + "value", + "version", } if err := checkHCLKeys(o.Val, valid); err != nil { return err @@ -467,6 +468,11 @@ func parseConstraints(result *[]*api.Constraint, list *ast.ObjectList) error { m["Operand"] = structs.ConstraintDistinctHosts } + if property, ok := m[structs.ConstraintDistinctProperty]; ok { + m["Operand"] = structs.ConstraintDistinctProperty + m["LTarget"] = property + } + // Build the constraint var c api.Constraint if err := mapstructure.WeakDecode(m, &c); err != nil { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 58d9225c155..6090dee5402 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -303,6 +303,21 @@ func TestParse(t *testing.T) { false, }, + { + "distinctProperty-constraint.hcl", + &api.Job{ + ID: helper.StringToPtr("foo"), + Name: helper.StringToPtr("foo"), + Constraints: []*api.Constraint{ + &api.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + }, + false, + }, + { "periodic-cron.hcl", &api.Job{ diff --git a/jobspec/test-fixtures/distinctProperty-constraint.hcl b/jobspec/test-fixtures/distinctProperty-constraint.hcl new file mode 100644 index 00000000000..d7cc1ababfc --- /dev/null +++ b/jobspec/test-fixtures/distinctProperty-constraint.hcl @@ -0,0 +1,5 @@ +job "foo" { + constraint { + distinct_property = "${meta.rack}" + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 56009051053..e96ea290214 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3189,10 +3189,11 @@ func (ta *TaskArtifact) Validate() error { } const ( - ConstraintDistinctHosts = "distinct_hosts" - ConstraintRegex = "regexp" - ConstraintVersion = "version" - ConstraintSetContains = "set_contains" + ConstraintDistinctProperty = "distinct_property" + ConstraintDistinctHosts = "distinct_hosts" + ConstraintRegex = "regexp" + ConstraintVersion = "version" + ConstraintSetContains = "set_contains" ) // Constraints are used to restrict placement options. diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 2ca75ee17d4..bd60fc2eaea 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -142,12 +142,10 @@ func (c *DriverChecker) hasDrivers(option *structs.Node) bool { return true } -// ProposedAllocConstraintIterator is a FeasibleIterator which returns nodes that -// match constraints that are not static such as Node attributes but are -// effected by proposed alloc placements. Examples are distinct_hosts and -// tenancy constraints. This is used to filter on job and task group -// constraints. -type ProposedAllocConstraintIterator struct { +// DistinctHostsIterator is a FeasibleIterator which returns nodes that pass the +// distinct_hosts constraint. The constraint ensures that multiple allocations +// do not exist on the same node. +type DistinctHostsIterator struct { ctx Context source FeasibleIterator tg *structs.TaskGroup @@ -159,44 +157,47 @@ type ProposedAllocConstraintIterator struct { jobDistinctHosts bool } -// NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator -// from a source. -func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *ProposedAllocConstraintIterator { - return &ProposedAllocConstraintIterator{ +// NewDistinctHostsIterator creates a DistinctHostsIterator from a source. +func NewDistinctHostsIterator(ctx Context, source FeasibleIterator) *DistinctHostsIterator { + return &DistinctHostsIterator{ ctx: ctx, source: source, } } -func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) { +func (iter *DistinctHostsIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints) } -func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) { +func (iter *DistinctHostsIterator) SetJob(job *structs.Job) { iter.job = job iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints) } -func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { +func (iter *DistinctHostsIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { for _, con := range constraints { if con.Operand == structs.ConstraintDistinctHosts { return true } } + return false } -func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { +func (iter *DistinctHostsIterator) Next() *structs.Node { for { // Get the next option from the source option := iter.source.Next() - // Hot-path if the option is nil or there are no distinct_hosts constraints. - if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts) { + // Hot-path if the option is nil or there are no distinct_hosts or + // distinct_property constraints. + hosts := iter.jobDistinctHosts || iter.tgDistinctHosts + if option == nil || !hosts { return option } + // Check if the host constraints are satisfied if !iter.satisfiesDistinctHosts(option) { iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts) continue @@ -208,7 +209,7 @@ func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { // satisfiesDistinctHosts checks if the node satisfies a distinct_hosts // constraint either specified at the job level or the TaskGroup level. -func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *structs.Node) bool { +func (iter *DistinctHostsIterator) satisfiesDistinctHosts(option *structs.Node) bool { // Check if there is no constraint set. if !(iter.jobDistinctHosts || iter.tgDistinctHosts) { return true @@ -237,8 +238,113 @@ func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *stru return true } -func (iter *ProposedAllocConstraintIterator) Reset() { +func (iter *DistinctHostsIterator) Reset() { + iter.source.Reset() +} + +// DistinctPropertyIterator is a FeasibleIterator which returns nodes that pass the +// distinct_property constraint. The constraint ensures that multiple allocations +// do not use the same value of the given property. +type DistinctPropertyIterator struct { + ctx Context + source FeasibleIterator + tg *structs.TaskGroup + job *structs.Job + + hasDistinctPropertyConstraints bool + jobPropertySets []*propertySet + groupPropertySets map[string][]*propertySet +} + +// NewDistinctPropertyIterator creates a DistinctPropertyIterator from a source. +func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *DistinctPropertyIterator { + return &DistinctPropertyIterator{ + ctx: ctx, + source: source, + groupPropertySets: make(map[string][]*propertySet), + } +} + +func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { + iter.tg = tg + + // Build the property set at the taskgroup level + if _, ok := iter.groupPropertySets[tg.Name]; !ok { + for _, c := range tg.Constraints { + if c.Operand != structs.ConstraintDistinctProperty { + continue + } + + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTGConstraint(c, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + } + + // Check if there is a distinct property + iter.hasDistinctPropertyConstraints = len(iter.jobPropertySets) != 0 || len(iter.groupPropertySets[tg.Name]) != 0 +} + +func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { + iter.job = job + + // Build the property set at the job level + for _, c := range job.Constraints { + if c.Operand != structs.ConstraintDistinctProperty { + continue + } + + pset := NewPropertySet(iter.ctx, job) + pset.SetJobConstraint(c) + iter.jobPropertySets = append(iter.jobPropertySets, pset) + } +} + +func (iter *DistinctPropertyIterator) Next() *structs.Node { + for { + // Get the next option from the source + option := iter.source.Next() + + // Hot path if there is nothing to check + if option == nil || !iter.hasDistinctPropertyConstraints { + return option + } + + // Check if the constraints are met + if !iter.satisfiesProperties(option, iter.jobPropertySets) || + !iter.satisfiesProperties(option, iter.groupPropertySets[iter.tg.Name]) { + continue + } + + return option + } +} + +// satisfiesProperties returns whether the option satisfies the set of +// properties. If not it will be filtered. +func (iter *DistinctPropertyIterator) satisfiesProperties(option *structs.Node, set []*propertySet) bool { + for _, ps := range set { + if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies { + iter.ctx.Metrics().FilterNode(option, reason) + return false + } + } + + return true +} + +func (iter *DistinctPropertyIterator) Reset() { iter.source.Reset() + + for _, ps := range iter.jobPropertySets { + ps.PopulateProposed() + } + + for _, sets := range iter.groupPropertySets { + for _, ps := range sets { + ps.PopulateProposed() + } + } } // ConstraintChecker is a FeasibilityChecker which returns nodes that match a @@ -327,7 +433,7 @@ func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bo func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { // Check for constraints not handled by this checker. switch operand { - case structs.ConstraintDistinctHosts: + case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: return true default: break diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index e6fe430e47d..097bfc175bf 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "fmt" "reflect" "testing" @@ -434,13 +435,14 @@ func TestCheckRegexpConstraint(t *testing.T) { } } -func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) { +// This test puts allocations on the node to test if it detects infeasibility of +// nodes correctly and picks the only feasible one +func TestDistinctHostsIterator_JobDistinctHosts(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), mock.Node(), mock.Node(), - mock.Node(), } static := NewStaticIterator(ctx, nodes) @@ -454,25 +456,53 @@ func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) { TaskGroups: []*structs.TaskGroup{tg1, tg2}, } - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(tg1) - propsed.SetJob(job) + // Add allocs placing tg1 on node1 and tg2 on node2. This should make the + // job unsatisfiable on all nodes but node3 + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + }, + } + plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + }, + } + + proposed := NewDistinctHostsIterator(ctx, static) + proposed.SetTaskGroup(tg1) + proposed.SetJob(job) - out := collectFeasible(propsed) - if len(out) != 4 { + out := collectFeasible(proposed) + if len(out) != 1 { t.Fatalf("Bad: %#v", out) } - selected := make(map[string]struct{}, 4) - for _, option := range out { - if _, ok := selected[option.ID]; ok { - t.Fatalf("selected node %v for more than one alloc", option) - } - selected[option.ID] = struct{}{} + if out[0].ID != nodes[2].ID { + t.Fatalf("wrong node picked") } } -func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { +func TestDistinctHostsIterator_JobDistinctHosts_InfeasibleCount(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -480,18 +510,19 @@ func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { } static := NewStaticIterator(ctx, nodes) - // Create a job with a distinct_hosts constraint and two task groups. + // Create a job with a distinct_hosts constraint and three task groups. tg1 := &structs.TaskGroup{Name: "bar"} tg2 := &structs.TaskGroup{Name: "baz"} + tg3 := &structs.TaskGroup{Name: "bam"} job := &structs.Job{ ID: "foo", Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}}, - TaskGroups: []*structs.TaskGroup{tg1, tg2}, + TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3}, } // Add allocs placing tg1 on node1 and tg2 on node2. This should make the - // job unsatisfiable. + // job unsatisfiable for tg3 plan := ctx.Plan() plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ &structs.Allocation{ @@ -499,19 +530,207 @@ func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { JobID: job.ID, ID: structs.GenerateUUID(), }, + } + plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + }, + } + + proposed := NewDistinctHostsIterator(ctx, static) + proposed.SetTaskGroup(tg3) + proposed.SetJob(job) + + // It should not be able to place 3 tasks with only two nodes. + out := collectFeasible(proposed) + if len(out) != 0 { + t.Fatalf("Bad: %#v", out) + } +} + +func TestDistinctHostsIterator_TaskGroupDistinctHosts(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + static := NewStaticIterator(ctx, nodes) + + // Create a task group with a distinct_hosts constraint. + tg1 := &structs.TaskGroup{ + Name: "example", + Constraints: []*structs.Constraint{ + {Operand: structs.ConstraintDistinctHosts}, + }, + } + tg2 := &structs.TaskGroup{Name: "baz"} + + // Add a planned alloc to node1. + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "foo", + }, + } + + // Add a planned alloc to node2 with the same task group name but a + // different job. + plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "bar", + }, + } + + proposed := NewDistinctHostsIterator(ctx, static) + proposed.SetTaskGroup(tg1) + proposed.SetJob(&structs.Job{ID: "foo"}) + + out := collectFeasible(proposed) + if len(out) != 1 { + t.Fatalf("Bad: %#v", out) + } + + // Expect it to skip the first node as there is a previous alloc on it for + // the same task group. + if out[0] != nodes[1] { + t.Fatalf("Bad: %v", out) + } + + // Since the other task group doesn't have the constraint, both nodes should + // be feasible. + proposed.Reset() + proposed.SetTaskGroup(tg2) + out = collectFeasible(proposed) + if len(out) != 2 { + t.Fatalf("Bad: %#v", out) + } +} + +// This test puts creates allocations across task groups that use a property +// value to detect if the constraint at the job level properly considers all +// task groups. +func TestDistinctPropertyIterator_JobDistinctProperty(t *testing.T) { + state, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + + for i, n := range nodes { + n.Meta["rack"] = fmt.Sprintf("%d", i) + + // Add to state store + if err := state.UpsertNode(uint64(100+i), n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + } + + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_property constraint and a task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + tg2 := &structs.TaskGroup{Name: "baz"} + + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{ + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + TaskGroups: []*structs.TaskGroup{tg1, tg2}, + } + + // Add allocs placing tg1 on node1 and 2 and tg2 on node3 and 4. This should make the + // job unsatisfiable on all nodes but node5. Also mix the allocations + // existing in the plan and the state store. + plan := ctx.Plan() + alloc1ID := structs.GenerateUUID() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: alloc1ID, + NodeID: nodes[0].ID, + }, // Should be ignored as it is a different job. &structs.Allocation{ TaskGroup: tg2.Name, JobID: "ignore 2", ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, }, } - plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + plan.NodeAllocation[nodes[2].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, + } + + // Put an allocation on Node 5 but make it stopped in the plan + stoppingAllocID := structs.GenerateUUID() + plan.NodeUpdate[nodes[4].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: stoppingAllocID, + NodeID: nodes[4].ID, + }, + } + + upserting := []*structs.Allocation{ + // Have one of the allocations exist in both the plan and the state + // store. This resembles an allocation update + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: alloc1ID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, &structs.Allocation{ TaskGroup: tg2.Name, JobID: job.ID, ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[3].ID, }, // Should be ignored as it is a different job. @@ -519,96 +738,296 @@ func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { TaskGroup: tg1.Name, JobID: "ignore 2", ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[3].ID, + }, + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: stoppingAllocID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[4].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + proposed := NewDistinctPropertyIterator(ctx, static) + proposed.SetJob(job) + proposed.SetTaskGroup(tg2) + proposed.Reset() + + out := collectFeasible(proposed) + if len(out) != 1 { + t.Fatalf("Bad: %#v", out) + } + if out[0].ID != nodes[4].ID { + t.Fatalf("wrong node picked") + } +} + +// This test checks that if a node has an allocation on it that gets stopped, +// there is a plan to re-use that for a new allocation, that the next select +// won't select that node. +func TestDistinctPropertyIterator_JobDistinctProperty_RemoveAndReplace(t *testing.T) { + state, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + + nodes[0].Meta["rack"] = "1" + + // Add to state store + if err := state.UpsertNode(uint64(100), nodes[0]); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_property constraint and a task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{ + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + TaskGroups: []*structs.TaskGroup{tg1}, + } + + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + + stoppingAllocID := structs.GenerateUUID() + plan.NodeUpdate[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + NodeID: nodes[0].ID, }, } - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(tg1) - propsed.SetJob(job) + upserting := []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } - out := collectFeasible(propsed) + proposed := NewDistinctPropertyIterator(ctx, static) + proposed.SetJob(job) + proposed.SetTaskGroup(tg1) + proposed.Reset() + + out := collectFeasible(proposed) if len(out) != 0 { t.Fatalf("Bad: %#v", out) } } -func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) { - _, ctx := testContext(t) +// This test creates previous allocations selecting certain property values to +// test if it detects infeasibility of property values correctly and picks the +// only feasible one +func TestDistinctPropertyIterator_JobDistinctProperty_Infeasible(t *testing.T) { + state, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), mock.Node(), } + + for i, n := range nodes { + n.Meta["rack"] = fmt.Sprintf("%d", i) + + // Add to state store + if err := state.UpsertNode(uint64(100+i), n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + } + static := NewStaticIterator(ctx, nodes) - // Create a job with a distinct_hosts constraint and three task groups. + // Create a job with a distinct_property constraint and a task groups. tg1 := &structs.TaskGroup{Name: "bar"} tg2 := &structs.TaskGroup{Name: "baz"} tg3 := &structs.TaskGroup{Name: "bam"} job := &structs.Job{ - ID: "foo", - Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}}, - TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3}, + ID: "foo", + Constraints: []*structs.Constraint{ + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3}, + } + + // Add allocs placing tg1 on node1 and tg2 on node2. This should make the + // job unsatisfiable for tg3. + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + upserting := []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) } - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(tg1) - propsed.SetJob(job) + proposed := NewDistinctPropertyIterator(ctx, static) + proposed.SetJob(job) + proposed.SetTaskGroup(tg3) + proposed.Reset() - // It should not be able to place 3 tasks with only two nodes. - out := collectFeasible(propsed) - if len(out) != 2 { + out := collectFeasible(proposed) + if len(out) != 0 { t.Fatalf("Bad: %#v", out) } } -func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { - _, ctx := testContext(t) +// This test creates previous allocations selecting certain property values to +// test if it detects infeasibility of property values correctly and picks the +// only feasible one when the constraint is at the task group. +func TestDistinctPropertyIterator_TaskGroupDistinctProperty(t *testing.T) { + state, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), mock.Node(), + mock.Node(), } + + for i, n := range nodes { + n.Meta["rack"] = fmt.Sprintf("%d", i) + + // Add to state store + if err := state.UpsertNode(uint64(100+i), n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + } + static := NewStaticIterator(ctx, nodes) - // Create a task group with a distinct_hosts constraint. - taskGroup := &structs.TaskGroup{ + // Create a job with a task group with the distinct_property constraint + tg1 := &structs.TaskGroup{ Name: "example", Constraints: []*structs.Constraint{ - {Operand: structs.ConstraintDistinctHosts}, + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, }, } + tg2 := &structs.TaskGroup{Name: "baz"} - // Add a planned alloc to node1. + job := &structs.Job{ + ID: "foo", + TaskGroups: []*structs.TaskGroup{tg1, tg2}, + } + + // Add allocs placing tg1 on node1 and 2. This should make the + // job unsatisfiable on all nodes but node3. Also mix the allocations + // existing in the plan and the state store. plan := ctx.Plan() plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ &structs.Allocation{ - TaskGroup: taskGroup.Name, - JobID: "foo", + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, }, } - // Add a planned alloc to node2 with the same task group name but a - // different job. - plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + // Put an allocation on Node 3 but make it stopped in the plan + stoppingAllocID := structs.GenerateUUID() + plan.NodeUpdate[nodes[2].ID] = []*structs.Allocation{ &structs.Allocation{ - TaskGroup: taskGroup.Name, - JobID: "bar", + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + NodeID: nodes[2].ID, }, } - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(taskGroup) - propsed.SetJob(&structs.Job{ID: "foo"}) + upserting := []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, - out := collectFeasible(propsed) + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, + + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + proposed := NewDistinctPropertyIterator(ctx, static) + proposed.SetJob(job) + proposed.SetTaskGroup(tg1) + proposed.Reset() + + out := collectFeasible(proposed) if len(out) != 1 { t.Fatalf("Bad: %#v", out) } + if out[0].ID != nodes[2].ID { + t.Fatalf("wrong node picked") + } - // Expect it to skip the first node as there is a previous alloc on it for - // the same task group. - if out[0] != nodes[1] { - t.Fatalf("Bad: %v", out) + // Since the other task group doesn't have the constraint, both nodes should + // be feasible. + proposed.SetTaskGroup(tg2) + proposed.Reset() + + out = collectFeasible(proposed) + if len(out) != 3 { + t.Fatalf("Bad: %#v", out) } } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 36ec50b0fc1..6df199d6469 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -229,6 +229,253 @@ func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_DistinctHosts(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct host and has count 1 higher than what is + // possible. + job := mock.Job() + job.TaskGroups[0].Count = 11 + job.Constraints = append(job.Constraints, &structs.Constraint{Operand: structs.ConstraintDistinctHosts}) + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the eval has spawned blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan failed to alloc + outEval := h.Evals[0] + if len(outEval.FailedTGAllocs) != 1 { + t.Fatalf("bad: %+v", outEval) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + // Ensure different node was used per. + used := make(map[string]struct{}) + for _, alloc := range out { + if _, ok := used[alloc.NodeID]; ok { + t.Fatalf("Node collision %v", alloc.NodeID) + } + used[alloc.NodeID] = struct{}{} + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestServiceSched_JobRegister_DistinctProperty(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + rack := "rack2" + if i < 5 { + rack = "rack1" + } + node.Meta["rack"] = rack + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct property and has count higher than what is + // possible. + job := mock.Job() + job.TaskGroups[0].Count = 4 + job.Constraints = append(job.Constraints, + &structs.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }) + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval has spawned blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan failed to alloc + outEval := h.Evals[0] + if len(outEval.FailedTGAllocs) != 1 { + t.Fatalf("bad: %+v", outEval) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 2 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 2 { + t.Fatalf("bad: %#v", out) + } + + // Ensure different node was used per. + used := make(map[string]struct{}) + for _, alloc := range out { + if _, ok := used[alloc.NodeID]; ok { + t.Fatalf("Node collision %v", alloc.NodeID) + } + used[alloc.NodeID] = struct{}{} + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestServiceSched_JobRegister_DistinctProperty_TaskGroup(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 2; i++ { + node := mock.Node() + node.Meta["ssd"] = "true" + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct property and has count higher than what is + // possible. + job := mock.Job() + job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy()) + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Constraints = append(job.TaskGroups[0].Constraints, + &structs.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.ssd}", + }) + + job.TaskGroups[1].Name = "tg2" + job.TaskGroups[1].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval hasn't spawned blocked eval + if len(h.CreateEvals) != 0 { + t.Fatalf("bad: %#v", h.CreateEvals[0]) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 2 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 2 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) @@ -1318,6 +1565,112 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { } } +func TestServiceSched_JobModify_DistinctProperty(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + node.Meta["rack"] = fmt.Sprintf("rack%d", i) + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct property and has count higher than what is + // possible. + job := mock.Job() + job.TaskGroups[0].Count = 11 + job.Constraints = append(job.Constraints, + &structs.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }) + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + oldJob := job.Copy() + oldJob.JobModifyIndex -= 1 + oldJob.TaskGroups[0].Count = 4 + + // Place 4 of 10 + var allocs []*structs.Allocation + for i := 0; i < 4; i++ { + alloc := mock.Alloc() + alloc.Job = oldJob + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval hasn't spawned blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan failed to alloc + outEval := h.Evals[0] + if len(outEval.FailedTGAllocs) != 1 { + t.Fatalf("bad: %+v", outEval) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", planned) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + // Ensure different node was used per. + used := make(map[string]struct{}) + for _, alloc := range out { + if _, ok := used[alloc.NodeID]; ok { + t.Fatalf("Node collision %v", alloc.NodeID) + } + used[alloc.NodeID] = struct{}{} + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobDeregister(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go new file mode 100644 index 00000000000..95ed7f9c706 --- /dev/null +++ b/scheduler/propertyset.go @@ -0,0 +1,265 @@ +package scheduler + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" +) + +// propertySet is used to track the values used for a particular property. +type propertySet struct { + // ctx is used to lookup the plan and state + ctx Context + + // jobID is the job we are operating on + jobID string + + // taskGroup is optionally set if the constraint is for a task group + taskGroup string + + // constraint is the constraint this property set is checking + constraint *structs.Constraint + + // errorBuilding marks whether there was an error when building the property + // set + errorBuilding error + + // existingValues is the set of values for the given property that have been + // used by pre-existing allocations. + existingValues map[string]struct{} + + // proposedValues is the set of values for the given property that are used + // from proposed allocations. + proposedValues map[string]struct{} + + // clearedValues is the set of values that are no longer being used by + // existingValues because of proposed stops. + clearedValues map[string]struct{} +} + +// NewPropertySet returns a new property set used to guarantee unique property +// values for new allocation placements. +func NewPropertySet(ctx Context, job *structs.Job) *propertySet { + p := &propertySet{ + ctx: ctx, + jobID: job.ID, + existingValues: make(map[string]struct{}), + } + + return p +} + +// SetJobConstraint is used to parameterize the property set for a +// distinct_property constraint set at the job level. +func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) { + // Store the constraint + p.constraint = constraint + p.populateExisting(constraint) +} + +// SetTGConstraint is used to parameterize the property set for a +// distinct_property constraint set at the task group level. The inputs are the +// constraint and the task group name. +func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) { + // Store that this is for a task group + p.taskGroup = taskGroup + + // Store the constraint + p.constraint = constraint + + p.populateExisting(constraint) +} + +// populateExisting is a helper shared when setting the constraint to populate +// the existing values. +func (p *propertySet) populateExisting(constraint *structs.Constraint) { + // Retrieve all previously placed allocations + ws := memdb.NewWatchSet() + allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false) + if err != nil { + p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err) + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding) + return + } + + // Filter to the correct set of allocs + allocs = p.filterAllocs(allocs, true) + + // Get all the nodes that have been used by the allocs + nodes, err := p.buildNodeMap(allocs) + if err != nil { + p.errorBuilding = err + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) + return + } + + // Build existing properties map + p.populateProperties(allocs, nodes, p.existingValues) +} + +// PopulateProposed populates the proposed values and recomputes any cleared +// value. It should be called whenever the plan is updated to ensure correct +// results when checking an option. +func (p *propertySet) PopulateProposed() { + + // Reset the proposed properties + p.proposedValues = make(map[string]struct{}) + p.clearedValues = make(map[string]struct{}) + + // Gather the set of proposed stops. + var stopping []*structs.Allocation + for _, updates := range p.ctx.Plan().NodeUpdate { + stopping = append(stopping, updates...) + } + stopping = p.filterAllocs(stopping, false) + + // Gather the proposed allocations + var proposed []*structs.Allocation + for _, pallocs := range p.ctx.Plan().NodeAllocation { + proposed = append(proposed, pallocs...) + } + proposed = p.filterAllocs(proposed, true) + + // Get the used nodes + both := make([]*structs.Allocation, 0, len(stopping)+len(proposed)) + both = append(both, stopping...) + both = append(both, proposed...) + nodes, err := p.buildNodeMap(both) + if err != nil { + p.errorBuilding = err + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) + return + } + + // Populate the cleared values + p.populateProperties(stopping, nodes, p.clearedValues) + + // Populate the proposed values + p.populateProperties(proposed, nodes, p.proposedValues) + + // Remove any cleared value that is now being used by the proposed allocs + for value := range p.proposedValues { + delete(p.clearedValues, value) + } +} + +// SatisfiesDistinctProperties checks if the option satisfies the +// distinct_property constraints given the existing placements and proposed +// placements. If the option does not satisfy the constraints an explanation is +// given. +func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) { + // Check if there was an error building + if p.errorBuilding != nil { + return false, p.errorBuilding.Error() + } + + // Get the nodes property value + nValue, ok := getProperty(option, p.constraint.LTarget) + if !ok { + return false, fmt.Sprintf("missing property %q", p.constraint.LTarget) + } + + // both is used to iterate over both the proposed and existing used + // properties + bothAll := []map[string]struct{}{p.existingValues, p.proposedValues} + + // Check if the nodes value has already been used. + for _, usedProperties := range bothAll { + // Check if the nodes value has been used + _, used := usedProperties[nValue] + if !used { + continue + } + + // Check if the value has been cleared from a proposed stop + if _, cleared := p.clearedValues[nValue]; cleared { + continue + } + + return false, fmt.Sprintf("distinct_property: %s=%s already used", p.constraint.LTarget, nValue) + } + + return true, "" +} + +// filterAllocs filters a set of allocations to just be those that are running +// and if the property set is operation at a task group level, for allocations +// for that task group +func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation { + n := len(allocs) + for i := 0; i < n; i++ { + remove := false + if filterTerminal { + remove = allocs[i].TerminalStatus() + } + + // If the constraint is on the task group filter the allocations to just + // those on the task group + if p.taskGroup != "" { + remove = remove || allocs[i].TaskGroup != p.taskGroup + } + + if remove { + allocs[i], allocs[n-1] = allocs[n-1], nil + i-- + n-- + } + } + return allocs[:n] +} + +// buildNodeMap takes a list of allocations and returns a map of the nodes used +// by those allocations +func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) { + // Get all the nodes that have been used by the allocs + nodes := make(map[string]*structs.Node) + ws := memdb.NewWatchSet() + for _, alloc := range allocs { + if _, ok := nodes[alloc.NodeID]; ok { + continue + } + + node, err := p.ctx.State().NodeByID(ws, alloc.NodeID) + if err != nil { + return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err) + } + + nodes[alloc.NodeID] = node + } + + return nodes, nil +} + +// populateProperties goes through all allocations and builds up the used +// properties from the nodes storing the results in the passed properties map. +func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node, + properties map[string]struct{}) { + + for _, alloc := range allocs { + nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + if !ok { + continue + } + + properties[nProperty] = struct{}{} + } +} + +// getProperty is used to lookup the property value on the node +func getProperty(n *structs.Node, property string) (string, bool) { + if n == nil || property == "" { + return "", false + } + + val, ok := resolveConstraintTarget(property, n) + if !ok { + return "", false + } + nodeValue, ok := val.(string) + if !ok { + return "", false + } + + return nodeValue, true +} diff --git a/scheduler/stack.go b/scheduler/stack.go index d685be1c76e..ef4c3d2d447 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -44,11 +44,12 @@ type GenericStack struct { taskGroupDrivers *DriverChecker taskGroupConstraint *ConstraintChecker - proposedAllocConstraint *ProposedAllocConstraintIterator - binPack *BinPackIterator - jobAntiAff *JobAntiAffinityIterator - limit *LimitIterator - maxScore *MaxScoreIterator + distinctHostsConstraint *DistinctHostsIterator + distinctPropertyConstraint *DistinctPropertyIterator + binPack *BinPackIterator + jobAntiAff *JobAntiAffinityIterator + limit *LimitIterator + maxScore *MaxScoreIterator } // NewGenericStack constructs a stack used for selecting service placements @@ -81,11 +82,14 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs) - // Filter on constraints that are affected by propsed allocations. - s.proposedAllocConstraint = NewProposedAllocConstraintIterator(ctx, s.wrappedChecks) + // Filter on distinct host constraints. + s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks) + + // Filter on distinct property constraints. + s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint) // Upgrade from feasible to rank iterator - rankSource := NewFeasibleRankIterator(ctx, s.proposedAllocConstraint) + rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) // Apply the bin packing, this depends on the resources needed // by a particular task group. Only enable eviction for the service @@ -134,7 +138,8 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { func (s *GenericStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) - s.proposedAllocConstraint.SetJob(job) + s.distinctHostsConstraint.SetJob(job) + s.distinctPropertyConstraint.SetJob(job) s.binPack.SetPriority(job.Priority) s.jobAntiAff.SetJob(job.ID) s.ctx.Eligibility().SetJob(job) @@ -152,7 +157,8 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) - s.proposedAllocConstraint.SetTaskGroup(tg) + s.distinctHostsConstraint.SetTaskGroup(tg) + s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) s.binPack.SetTaskGroup(tg) @@ -187,13 +193,14 @@ func (s *GenericStack) SelectPreferringNodes(tg *structs.TaskGroup, nodes []*str // SystemStack is the Stack used for the System scheduler. It is designed to // attempt to make placements on all nodes. type SystemStack struct { - ctx Context - source *StaticIterator - wrappedChecks *FeasibilityWrapper - jobConstraint *ConstraintChecker - taskGroupDrivers *DriverChecker - taskGroupConstraint *ConstraintChecker - binPack *BinPackIterator + ctx Context + source *StaticIterator + wrappedChecks *FeasibilityWrapper + jobConstraint *ConstraintChecker + taskGroupDrivers *DriverChecker + taskGroupConstraint *ConstraintChecker + distinctPropertyConstraint *DistinctPropertyIterator + binPack *BinPackIterator } // NewSystemStack constructs a stack used for selecting service placements @@ -222,8 +229,11 @@ func NewSystemStack(ctx Context) *SystemStack { tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs) + // Filter on distinct property constraints. + s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.wrappedChecks) + // Upgrade from feasible to rank iterator - rankSource := NewFeasibleRankIterator(ctx, s.wrappedChecks) + rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) // Apply the bin packing, this depends on the resources needed // by a particular task group. Enable eviction as system jobs are high @@ -239,6 +249,7 @@ func (s *SystemStack) SetNodes(baseNodes []*structs.Node) { func (s *SystemStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) + s.distinctPropertyConstraint.SetJob(job) s.binPack.SetPriority(job.Priority) s.ctx.Eligibility().SetJob(job) } @@ -255,8 +266,9 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) - s.binPack.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) + s.distinctPropertyConstraint.SetTaskGroup(tg) + s.binPack.SetTaskGroup(tg) // Get the next option that satisfies the constraints. option := s.binPack.Next() diff --git a/scheduler/util.go b/scheduler/util.go index f305134cdec..c660e122ef2 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -464,11 +464,6 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // Check if the task drivers or config has changed, requires // a rolling upgrade since that cannot be done in-place. - //existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) - //if tasksUpdated(update.TaskGroup, existing) { - //continue - //} - existing := update.Alloc.Job if tasksUpdated(job, existing, update.TaskGroup.Name) { continue