Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1572 from endocode/dongsu/fleetd-replaces-unit
Browse files Browse the repository at this point in the history
fleetd: introduce Replaces option in unit files
  • Loading branch information
Dongsu Park committed May 30, 2016
2 parents 6f3df2e + b58c637 commit 1d6e74a
Show file tree
Hide file tree
Showing 13 changed files with 461 additions and 4 deletions.
1 change: 1 addition & 0 deletions Documentation/unit-files-and-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Note that these requirements are derived directly from systemd, with the only ex
| `MachineMetadata` | Limit eligible machines to those with this specific metadata. |
| `Conflicts` | Prevent a unit from being collocated with other units using glob-matching on the other unit names. |
| `Global` | Schedule this unit on all agents in the cluster. A unit is considered invalid if options other than `MachineMetadata` are provided alongside `Global=true`. |
| `Replaces` | Schedule a specified unit on another machine. A unit is considered invalid if options `Global` or `Conflicts` are provided alongside `Replaces=`. A circular replacement between multiple units is not allowed. |

See [more information][unit-scheduling] on these parameters and how they impact scheduling decisions.

Expand Down
24 changes: 24 additions & 0 deletions agent/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,30 @@ func TestAbleToRun(t *testing.T) {
job: newTestJobWithXFleetValues(t, "Conflicts=ping.service"),
want: false,
},

// no replaces found
{
dState: &AgentState{
MState: &machine.MachineState{ID: "123"},
Units: map[string]*job.Unit{
"ping.service": &job.Unit{Name: "ping.service"},
},
},
job: newTestJobWithXFleetValues(t, "Replaces=pong.service"),
want: true,
},

// replaces found
{
dState: &AgentState{
MState: &machine.MachineState{ID: "123"},
Units: map[string]*job.Unit{
"ping.service": &job.Unit{Name: "ping.service"},
},
},
job: newTestJobWithXFleetValues(t, "Replaces=ping.service"),
want: false,
},
}

for i, tt := range tests {
Expand Down
49 changes: 49 additions & 0 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,48 @@ func (as *AgentState) hasConflict(pUnitName string, pConflicts []string) (found
return
}

// hasReplace determines whether there are any known replaces with the given Unit
func (as *AgentState) hasReplace(pUnitName string, pReplaces []string) (found bool, replace string) {
for _, eUnit := range as.Units {
foundPrepl := false
foundErepl := false
retStr := ""

if pUnitName == eUnit.Name {
continue
}

for _, pReplace := range pReplaces {
if globMatches(pReplace, eUnit.Name) {
foundPrepl = true
retStr = eUnit.Name
break
}
}

for _, eReplace := range eUnit.Replaces() {
if globMatches(eReplace, pUnitName) {
foundErepl = true
retStr = eUnit.Name
break
}
}

// Only 1 of 2 matches must be found. If both matches are found,
// it means it's a circular replace situation, which could result in
// an infinite loop. So ignore such replace options.
if (foundPrepl && foundErepl) || (!foundPrepl && !foundErepl) {
continue
} else {
found = true
replace = retStr
return
}
}

return
}

func globMatches(pattern, target string) bool {
matched, err := path.Match(pattern, target)
if err != nil {
Expand All @@ -81,6 +123,7 @@ func globMatches(pattern, target string) bool {
// - Agent must have all of the Job's required metadata (if any)
// - Agent must have all required Peers of the Job scheduled locally (if any)
// - Job must not conflict with any other Units scheduled to the agent
// - Job must specially handle replaced units to be rescheduled
func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
if tgt, ok := j.RequiredTarget(); ok && !as.MState.MatchID(tgt) {
return false, fmt.Sprintf("agent ID %q does not match required %q", as.MState.ID, tgt)
Expand All @@ -106,5 +149,11 @@ func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
return false, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
}

// Handle Replace option specially, by returning a special string
// "jobreschedule" as reason.
if cExists, _ := as.hasReplace(j.Name, j.Replaces()); cExists {
return false, job.JobReschedule
}

return true, ""
}
77 changes: 77 additions & 0 deletions agent/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,83 @@ func TestHasConflicts(t *testing.T) {
}
}

func TestHasReplaces(t *testing.T) {
tests := []struct {
cState *AgentState
job *job.Job
want bool
replace string
}{
// empty current state causes no replaces
{
cState: NewAgentState(&machine.MachineState{ID: "XXX"}),
job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")},
want: false,
},

// existing Job has replace with new Job
{
cState: &AgentState{
MState: &machine.MachineState{ID: "XXX"},
Units: map[string]*job.Unit{
"bar.service": &job.Unit{
Name: "bar.service",
Unit: fleetUnit(t, "Replaces=foo.service"),
},
},
},
job: &job.Job{Name: "foo.service", Unit: unit.UnitFile{}},
want: true,
replace: "bar.service",
},

// new Job has replace with existing job
{
cState: &AgentState{
MState: &machine.MachineState{ID: "XXX"},
Units: map[string]*job.Unit{
"bar.service": &job.Unit{
Name: "bar.service",
Unit: unit.UnitFile{},
},
},
},
job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")},
want: true,
replace: "bar.service",
},

// both jobs have replace with each other: it should fail
{
cState: &AgentState{
MState: &machine.MachineState{ID: "XXX"},
Units: map[string]*job.Unit{
"bar.service": &job.Unit{
Name: "bar.service",
Unit: fleetUnit(t, "Replaces=foo.service"),
},
},
},
job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")},
want: false,
replace: "bar.service",
},
}

for i, tt := range tests {
got, replace := tt.cState.hasReplace(tt.job.Name, tt.job.Replaces())
if got != tt.want {
var msg string
if tt.want == true {
msg = fmt.Sprintf("expected no replace, found replace with Job %q", replace)
} else {
msg = fmt.Sprintf("expected replace with Job %q, got none", replace)
}
t.Errorf("case %d: %s", i, msg)
}
}
}

func TestGlobMatches(t *testing.T) {
tests := []struct {
pattern string
Expand Down
14 changes: 14 additions & 0 deletions api/units.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func ValidateOptions(opts []*schema.UnitOption) error {
Unit: *uf,
}
conflicts := pkg.NewUnsafeSet(j.Conflicts()...)
replaces := pkg.NewUnsafeSet(j.Replaces()...)
peers := pkg.NewUnsafeSet(j.Peers()...)
for _, peer := range peers.Values() {
for _, conflict := range conflicts.Values() {
Expand All @@ -231,9 +232,16 @@ func ValidateOptions(opts []*schema.UnitOption) error {
return fmt.Errorf("unresolvable requirements: peer %q matches conflict %q", peer, conflict)
}
}
for _, replace := range replaces.Values() {
matched, _ := path.Match(replace, peer)
if matched {
return fmt.Errorf("unresolvable requirements: peer %q matches replace %q", peer, replace)
}
}
}
hasPeers := peers.Length() != 0
hasConflicts := conflicts.Length() != 0
hasReplaces := replaces.Length() != 0
_, hasReqTarget := j.RequiredTarget()
u := &job.Unit{
Unit: *uf,
Expand All @@ -247,10 +255,16 @@ func ValidateOptions(opts []*schema.UnitOption) error {
return errors.New("MachineID cannot be used with Conflicts")
case hasReqTarget && isGlobal:
return errors.New("MachineID cannot be used with Global")
case hasReqTarget && hasReplaces:
return errors.New("MachineID cannot be used with Replaces")
case isGlobal && hasPeers:
return errors.New("Global cannot be used with Peers")
case isGlobal && hasConflicts:
return errors.New("Global cannot be used with Conflicts")
case isGlobal && hasReplaces:
return errors.New("Global cannot be used with Replaces")
case hasConflicts && hasReplaces:
return errors.New("Conflicts cannot be used with Replaces")
}

return nil
Expand Down
73 changes: 72 additions & 1 deletion api/units_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,14 @@ func makeConflictUO(name string) *schema.UnitOption {
}
}

func makeReplaceUO(name string) *schema.UnitOption {
return &schema.UnitOption{
Section: "X-Fleet",
Name: "Replaces",
Value: name,
}
}

func makePeerUO(name string) *schema.UnitOption {
return &schema.UnitOption{
Section: "X-Fleet",
Expand Down Expand Up @@ -543,6 +551,39 @@ func TestValidateOptions(t *testing.T) {
},
false,
},
// Replaces
// non-overlapping replace is fine
{
[]*schema.UnitOption{
makeReplaceUO("foo.service"),
makeReplaceUO("bar.service"),
},
true,
},
{
[]*schema.UnitOption{
makeReplaceUO("foo.service"),
makePeerUO("bar.service"),
},
true,
},
// replace + conflict is not good
{
[]*schema.UnitOption{
makeReplaceUO("foo.service"),
makeConflictUO("bar.service"),
},
false,
},
// circular replaces are not good
{
[]*schema.UnitOption{
makeReplaceUO("foo.service"),
makeReplaceUO("bar.service"),
makePeerUO("bar.service"),
},
false,
},
// MachineID is fine by itself
{
[]*schema.UnitOption{
Expand Down Expand Up @@ -576,7 +617,25 @@ func TestValidateOptions(t *testing.T) {
},
{
[]*schema.UnitOption{
makeIDUO("zyxwvutsr"), makeConflictUO("foo.service"), makeConflictUO("bar.service"),
makeIDUO("zyxwvutsr"),
makeConflictUO("foo.service"),
makeConflictUO("bar.service"),
},
false,
},
// MachineID with Replaces no good
{
[]*schema.UnitOption{
makeIDUO("abcdefghi"),
makeReplaceUO("bar.service"),
},
false,
},
{
[]*schema.UnitOption{
makeIDUO("zyxwvutsr"),
makeReplaceUO("foo.service"),
makeReplaceUO("bar.service"),
},
false,
},
Expand Down Expand Up @@ -649,6 +708,18 @@ func TestValidateOptions(t *testing.T) {
},
false,
},
// Global with Replaces no good
{
[]*schema.UnitOption{
&schema.UnitOption{
Section: "X-Fleet",
Name: "Global",
Value: "true",
},
makeReplaceUO("foo.service"),
},
false,
},
}
for i, tt := range testCases {
err := ValidateOptions(tt.opts)
Expand Down
11 changes: 8 additions & 3 deletions engine/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,15 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
}

var able bool
if able, reason = as.AbleToRun(j); !able {
var ableReason string
if able, ableReason = as.AbleToRun(j); !able {
unschedule = true
reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID)
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
if ableReason == job.JobReschedule {
reason = ableReason
} else {
reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID)
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
}
return
}

Expand Down
18 changes: 18 additions & 0 deletions fleetctl/fleetctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,24 @@ MachineOf=zxcvq`),
Global=true
Conflicts=bar`),
},
{
"foo.service",
newUnitFile(t, `[X-Fleet]
Global=true
Replaces=bar`),
},
{
"foo.service",
newUnitFile(t, `[X-Fleet]
Conflicts=bar
Replaces=bar`),
},
{
"foo.service",
newUnitFile(t, `[X-Fleet]
MachineOf=abcd
Replaces=abcd`),
},
}
for i, tt = range testCases {
un = tt.name
Expand Down
5 changes: 5 additions & 0 deletions functional/fixtures/units/replace.0.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"
8 changes: 8 additions & 0 deletions functional/fixtures/units/replace.1.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"

[X-Fleet]
Replaces=replace.0.service
Loading

0 comments on commit 1d6e74a

Please sign in to comment.