Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

fleetd: support conflict in global unit #1571

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Documentation/unit-files-and-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Note that these requirements are derived directly from systemd, with the only ex
| `MachineOf` | Limit eligible machines to the one that hosts a specific unit. |
| `MachineMetadata` | Limit eligible machines to those with this specific metadata. |
| `Conflicts` | Prevent a unit from being collocated with other units using glob-matching on the other unit names. |
| `Global` | Schedule this unit on all agents in the cluster. A unit is considered invalid if options other than `MachineMetadata` are provided alongside `Global=true`. |
| `Global` | Schedule this unit on those agents in the cluster, which satisfy the conditions of both `MachineMetadata` and `Conflicts` if any of them is also given. A unit is considered invalid if options other than `MachineMetadata` and `Conflicts` are provided alongside `Global=true`. If `MachineMetadata` is provided alongside `Global=true`, only the agents having the metadata can be scheduled on. If `Conflicts` is provided alongside `Global=true`, only the agents not having the conflicting units can be scheduled on. The conflicting units also can not be scheduled on the agents which already have the existing conflicting global unit.|
| `Replaces` | Schedule a specified unit on another machine. A unit is considered invalid if options `Global` or `Conflicts` are provided alongside `Replaces=`. A circular replacement between multiple units is not allowed. |

See [more information][unit-scheduling] on these parameters and how they impact scheduling decisions.
Expand Down
15 changes: 12 additions & 3 deletions agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,25 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
for _, u := range units {
u := u
md := u.RequiredTargetMetadata()
if u.IsGlobal() && !machine.HasMetadata(&ms, md) {
log.Debugf("Agent unable to run global unit %s: missing required metadata", u.Name)
continue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? It does not seem to make a difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewoutp Exactly. It doesn't make any difference.
Why did it change? Short answer is, there's no particular reason.

Long answer is like that: @wuqixuan wrote the 1st patch ("fleetd: support conflict in global unit") by splitting the loop into 2 loops. However, @jonboulle suggested keeping a single loop. Unfortunately the original author @wuqixuan didn't respond, so I took it over, and wrote the 3rd patch ("agent: squash loops into a single one in desiredAgentState") to follow @jonboulle's suggestion. The patch effectively changed it back to a single loop, but code style in that particular part changed slightly. Thus now you see the difference in a unified diff view.

I suppose this difference is not critical for the entire functionality. But if you want me to change it again, sure, I can update the patch again.

Copy link
Contributor

@ewoutp ewoutp Jun 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I would not make more changes on it.

if u.IsGlobal() {
if !machine.HasMetadata(&ms, md) {
log.Debugf("Agent unable to run global unit %s: missing required metadata", u.Name)
continue
}
}

if !u.IsGlobal() {
sUnit, ok := sUnitMap[u.Name]
if !ok || sUnit.TargetMachineID == "" || sUnit.TargetMachineID != ms.ID {
continue
}
}

if cExists, _ := as.HasConflict(u.Name, u.Conflicts()); cExists {
continue
}

as.Units[u.Name] = &u
}

Expand Down
6 changes: 3 additions & 3 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (as *AgentState) unitScheduled(name string) bool {
return as.Units[name] != nil
}

// hasConflict determines whether there are any known conflicts with the given Unit
func (as *AgentState) hasConflict(pUnitName string, pConflicts []string) (found bool, conflict string) {
// HasConflict determines whether there are any known conflicts with the given Unit
func (as *AgentState) HasConflict(pUnitName string, pConflicts []string) (found bool, conflict string) {
for _, eUnit := range as.Units {
if pUnitName == eUnit.Name {
continue
Expand Down Expand Up @@ -145,7 +145,7 @@ func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
}
}

if cExists, cJobName := as.hasConflict(j.Name, j.Conflicts()); cExists {
if cExists, cJobName := as.HasConflict(j.Name, j.Conflicts()); cExists {
return false, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
}

Expand Down
2 changes: 1 addition & 1 deletion agent/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestHasConflicts(t *testing.T) {
}

for i, tt := range tests {
got, conflict := tt.cState.hasConflict(tt.job.Name, tt.job.Conflicts())
got, conflict := tt.cState.HasConflict(tt.job.Name, tt.job.Conflicts())
if got != tt.want {
var msg string
if tt.want == true {
Expand Down
2 changes: 0 additions & 2 deletions api/units.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ func ValidateOptions(opts []*schema.UnitOption) error {
return errors.New("MachineID cannot be used with Replaces")
case isGlobal && hasPeers:
return errors.New("Global cannot be used with Peers")
case isGlobal && hasConflicts:
return errors.New("Global cannot be used with Conflicts")
case isGlobal && hasReplaces:
return errors.New("Global cannot be used with Replaces")
case hasConflicts && hasReplaces:
Expand Down
7 changes: 4 additions & 3 deletions api/units_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func TestValidateOptions(t *testing.T) {
},
true,
},
// Global with Peers/Conflicts no good
// Global with Conflicts is ok
{
[]*schema.UnitOption{
&schema.UnitOption{
Expand All @@ -660,7 +660,7 @@ func TestValidateOptions(t *testing.T) {
},
makeConflictUO("foo.service"),
},
false,
true,
},
{
[]*schema.UnitOption{
Expand All @@ -671,8 +671,9 @@ func TestValidateOptions(t *testing.T) {
},
makeConflictUO("bar.service"),
},
false,
true,
},
// Global with peer no good
{
[]*schema.UnitOption{
&schema.UnitOption{
Expand Down
9 changes: 7 additions & 2 deletions engine/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ func (cs *clusterState) agents() map[string]*agent.AgentState {
for _, gu := range cs.gUnits {
gu := gu
for _, a := range agents {
if machine.HasMetadata(a.MState, gu.RequiredTargetMetadata()) {
a.Units[gu.Name] = gu
if !machine.HasMetadata(a.MState, gu.RequiredTargetMetadata()) {
continue
}

if cExists, _ := a.HasConflict(gu.Name, gu.Conflicts()); cExists {
continue
}
a.Units[gu.Name] = gu
}
}

Expand Down
6 changes: 0 additions & 6 deletions fleetctl/fleetctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,6 @@ MachineOf=zxcvq`),
"foo.service",
newUnitFile(t, `[X-Fleet]
Global=true
Conflicts=bar`),
},
{
"foo.service",
newUnitFile(t, `[X-Fleet]
Global=true
Replaces=bar`),
},
{
Expand Down
9 changes: 9 additions & 0 deletions functional/fixtures/units/conflict-global.0.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"

[X-Fleet]
Global=true
Conflicts=conflict-global.*.service
9 changes: 9 additions & 0 deletions functional/fixtures/units/conflict-global.1.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"

[X-Fleet]
Global=true
Conflicts=conflict-global.*.service
80 changes: 80 additions & 0 deletions functional/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,3 +582,83 @@ func TestScheduleGlobalUnits(t *testing.T) {
}
}
}

// TestScheduleGlobalConflicts starts 2 global units that conflict with each
// other, and check if only the first one can be found.
func TestScheduleGlobalConflicts(t *testing.T) {
// Create a three-member cluster
cluster, err := platform.NewNspawnCluster("smoke")
if err != nil {
t.Fatal(err)
}
defer cluster.Destroy(t)
members, err := platform.CreateNClusterMembers(cluster, 3)
if err != nil {
t.Fatal(err)
}
m0 := members[0]
machines, err := cluster.WaitForNMachines(m0, 3)
if err != nil {
t.Fatal(err)
}

cfGlobal0 := "fixtures/units/conflict-global.0.service"
cfGlobal1 := "fixtures/units/conflict-global.1.service"

// Launch a global unit
stdout, stderr, err := cluster.Fleetctl(m0, "start", "--no-block", cfGlobal0)
if err != nil {
t.Fatalf("Failed starting units: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err)
}

// the global unit should show up active on 3 machines
_, err = cluster.WaitForNActiveUnits(m0, 3)
if err != nil {
t.Fatal(err)
}

// Now add another global unit, which actually should not be started.
stdout, stderr, err = cluster.Fleetctl(m0, "start", "--no-block", cfGlobal1)
if err != nil {
t.Fatalf("Failed starting unit: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err)
}

// Should see only 3 units
states, err := cluster.WaitForNActiveUnits(m0, 3)
if err != nil {
t.Fatal(err)
}

// Each machine should have a single global unit conflict-global.0.service,
// but not conflict-global.1.service.
us0 := states[path.Base(cfGlobal0)]
us1 := states[path.Base(cfGlobal1)]
for _, mach := range machines {
var found bool
for _, state := range us0 {
if state.Machine == mach {
found = true
break
}
}
if !found {
t.Fatalf("Did not find global unit on machine %v", mach)
t.Logf("Found unit states:")
for _, state := range states {
t.Logf("%#v", state)
}
}

found = false
for _, state := range us1 {
if state.Machine == mach {
found = true
break
}
}
if found {
t.Fatalf("Did find global unit %s on machine %v", us1, mach)
t.Logf("Global units were not conflicted as expected.")
}
}
}