Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

fleetctl: {load|unload|start|stop} and get unit code cleanups #1469

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fleetctl/destroy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestRunDestroyUnits(t *testing.T) {
var wg sync.WaitGroup
errchan := make(chan error)

cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units))
cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units), false)

wg.Add(2)
go func() {
Expand Down
173 changes: 132 additions & 41 deletions fleetctl/fleetctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,51 @@ func getChecker() *ssh.HostKeyChecker {
return ssh.NewHostKeyChecker(keyFile)
}

// getUnitFile attempts to get a UnitFile configuration
// It takes a unit file name as a parameter and tries first to lookup
// the unit from the local disk. If it fails, it checks if the provided
// file name may reference an instance of a template unit, if so, it
// tries to get the template configuration either from the registry or
// the local disk.
// It returns a UnitFile configuration or nil; and any error ecountered
func getUnitFile(file string) (*unit.UnitFile, error) {
var uf *unit.UnitFile
name := unitNameMangle(file)

log.Debugf("Looking for Unit(%s) or its corresponding template", name)

// Assume that the file references a local unit file on disk and
// attempt to load it, if it exists
if _, err := os.Stat(file); !os.IsNotExist(err) {
uf, err = getUnitFromFile(file)
if err != nil {
return nil, fmt.Errorf("failed getting Unit(%s) from file: %v", file, err)
}
} else {
// Otherwise (if the unit file does not exist), check if the
// name appears to be an instance of a template unit
info := unit.NewUnitNameInfo(name)
if info == nil {
return nil, fmt.Errorf("error extracting information from unit name %s", name)
} else if !info.IsInstance() {
return nil, fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", name)
}

// If it is an instance check for a corresponding template
// unit in the Registry or disk.
// If we found a template unit, later we create a
// near-identical instance unit in the Registry - same
// unit file as the template, but different name
uf, err = getUnitFileFromTemplate(info, file)
if err != nil {
return nil, fmt.Errorf("failed getting Unit(%s) from template: %v", file, err)
}
}

log.Debugf("Found Unit(%s)", name)
return uf, nil
}

// getUnitFromFile attempts to load a Unit from a given filename
// It returns the Unit or nil, and any error encountered
func getUnitFromFile(file string) (*unit.UnitFile, error) {
Expand All @@ -497,6 +542,39 @@ func getUnitFromFile(file string) (*unit.UnitFile, error) {
return unit.NewUnitFile(string(out))
}

// getUnitFileFromTemplate attempts to get a Unit from a template unit that
// is either in the registry or on the file system
// It takes two arguments, the template information and the unit file name
// It returns the Unit or nil; and any error encountered
func getUnitFileFromTemplate(uni *unit.UnitNameInfo, fileName string) (*unit.UnitFile, error) {
var uf *unit.UnitFile

tmpl, err := cAPI.Unit(uni.Template)
if err != nil {
return nil, fmt.Errorf("unable to retrieve Unit(%s) from Registry: %v", uni.Template, err)
}

if tmpl != nil {
warnOnDifferentLocalUnit(fileName, tmpl)
uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options)
log.Debugf("Template Unit(%s) found in registry", uni.Template)
} else {
// Finally, if we could not find a template unit in the Registry,
// check the local disk for one instead
filePath := path.Join(path.Dir(fileName), uni.Template)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil, fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", uni.Template)
}

uf, err = getUnitFromFile(filePath)
if err != nil {
return nil, fmt.Errorf("unable to load Unit(%s) from file: %v", filePath, err)
}
}

return uf, nil
}

func getTunnelFlag() string {
tun := globalFlags.Tunnel
if tun != "" && !strings.Contains(tun, ":") {
Expand Down Expand Up @@ -599,8 +677,6 @@ func lazyCreateUnits(args []string) error {
errchan := make(chan error)
var wg sync.WaitGroup
for _, arg := range args {
// TODO(jonboulle): this loop is getting too unwieldy; factor it out

arg = maybeAppendDefaultUnitType(arg)
name := unitNameMangle(arg)

Expand All @@ -615,45 +691,12 @@ func lazyCreateUnits(args []string) error {
continue
}

var uf *unit.UnitFile
// Failing that, assume the name references a local unit file on disk, and attempt to load that, if it exists
// TODO(mischief): consolidate these two near-identical codepaths
if _, err := os.Stat(arg); !os.IsNotExist(err) {
uf, err = getUnitFromFile(arg)
if err != nil {
return fmt.Errorf("failed getting Unit(%s) from file: %v", arg, err)
}
} else {
// Otherwise (if the unit file does not exist), check if the name appears to be an instance unit,
// and if so, check for a corresponding template unit in the Registry
uni := unit.NewUnitNameInfo(name)
if uni == nil {
return fmt.Errorf("error extracting information from unit name %s", name)
} else if !uni.IsInstance() {
return fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", name)
}
tmpl, err := cAPI.Unit(uni.Template)
if err != nil {
return fmt.Errorf("error retrieving template Unit(%s) from Registry: %v", uni.Template, err)
}

// Finally, if we could not find a template unit in the Registry, check the local disk for one instead
if tmpl == nil {
file := path.Join(path.Dir(arg), uni.Template)
if _, err := os.Stat(file); os.IsNotExist(err) {
return fmt.Errorf("unable to find Unit(%s) or template Unit(%s) in Registry or on filesystem", name, uni.Template)
}
uf, err = getUnitFromFile(file)
if err != nil {
return fmt.Errorf("failed getting template Unit(%s) from file: %v", uni.Template, err)
}
} else {
warnOnDifferentLocalUnit(arg, tmpl)
uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options)
}

// If we found a template unit, create a near-identical instance unit in
// the Registry - same unit file as the template, but different name
// Assume that the name references a local unit file on
// disk or if it is an instance unit and if so get its
// corresponding unit
uf, err := getUnitFile(arg)
if err != nil {
return err
}

_, err = createUnit(name, uf)
Expand Down Expand Up @@ -746,6 +789,54 @@ func setTargetStateOfUnits(units []string, state job.JobState) ([]*schema.Unit,
return triggered, nil
}

// getBlockAttempts gets the correct value of how many attempts to try
// before giving up on an operation.
// It returns a negative value which means do not block, if zero is
// returned then it means try forever, and if a positive value is
// returned then try up to that value
func getBlockAttempts() int {
// By default we wait forever
var attempts int = 0

if sharedFlags.BlockAttempts > 0 {
attempts = sharedFlags.BlockAttempts
}

if sharedFlags.NoBlock {
attempts = -1
}

return attempts
}

// tryWaitForUnitStates tries to wait for units to reach the desired state.
// It takes 5 arguments, the units to wait for, the desired state, the
// desired JobState, how many attempts before timing out and a writer
// interface.
// tryWaitForUnitStates polls each of the indicated units until they
// reach the desired state. If maxAttempts is negative, then it will not
// wait, it will assume that all units reached their desired state.
// If maxAttempts is zero tryWaitForUnitStates will retry forever, and
// if it is greater than zero, it will retry up to the indicated value.
// It returns 0 on success or 1 on errors.
func tryWaitForUnitStates(units []string, state string, js job.JobState, maxAttempts int, out io.Writer) (ret int) {
// We do not wait just assume we reached the desired state
if maxAttempts <= -1 {
for _, name := range units {
stdout("Triggered unit %s %s", name, state)
}
return
}

errchan := waitForUnitStates(units, js, maxAttempts, out)
for err := range errchan {
stderr("Error waiting for units: %v", err)
ret = 1
}

return
}

// waitForUnitStates polls each of the indicated units until each of their
// states is equal to that which the caller indicates, or until the
// polling operation times out. waitForUnitStates will retry forever, or
Expand Down
111 changes: 91 additions & 20 deletions fleetctl/fleetctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/coreos/fleet/job"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
"github.com/coreos/fleet/schema"
"github.com/coreos/fleet/unit"
"github.com/coreos/fleet/version"

Expand All @@ -34,7 +35,7 @@ type commandTestResults struct {
expectedExit int
}

func newFakeRegistryForCommands(unitPrefix string, unitCount int) client.API {
func newFakeRegistryForCommands(unitPrefix string, unitCount int, template bool) client.API {
// clear machineStates for every invocation
machineStates = nil
machines := []machine.MachineState{
Expand All @@ -43,30 +44,43 @@ func newFakeRegistryForCommands(unitPrefix string, unitCount int) client.API {
}

jobs := make([]job.Job, 0)
appendJobsForTests(&jobs, machines[0], unitPrefix, unitCount)
appendJobsForTests(&jobs, machines[1], unitPrefix, unitCount)
appendJobsForTests(&jobs, machines[0], unitPrefix, unitCount, template)
appendJobsForTests(&jobs, machines[1], unitPrefix, unitCount, template)

states := make([]unit.UnitState, 0)
for i := 1; i <= unitCount; i++ {
if template {
state := unit.UnitState{
UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i),
UnitName: fmt.Sprintf("%s@.service", unitPrefix),
LoadState: "loaded",
ActiveState: "active",
SubState: "listening",
ActiveState: "inactive",
SubState: "dead",
MachineID: machines[0].ID,
}
states = append(states, state)
}
state.MachineID = machines[1].ID
states = append(states, state)
} else {
for i := 1; i <= unitCount; i++ {
state := unit.UnitState{
UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i),
LoadState: "loaded",
ActiveState: "active",
SubState: "listening",
MachineID: machines[0].ID,
}
states = append(states, state)
}

for i := 1; i <= unitCount; i++ {
state := unit.UnitState{
UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i),
LoadState: "loaded",
ActiveState: "inactive",
SubState: "dead",
MachineID: machines[1].ID,
for i := 1; i <= unitCount; i++ {
state := unit.UnitState{
UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i),
LoadState: "loaded",
ActiveState: "inactive",
SubState: "dead",
MachineID: machines[1].ID,
}
states = append(states, state)
}
states = append(states, state)
}

reg := registry.NewFakeRegistry()
Expand All @@ -77,14 +91,40 @@ func newFakeRegistryForCommands(unitPrefix string, unitCount int) client.API {
return &client.RegistryClient{Registry: reg}
}

func appendJobsForTests(jobs *[]job.Job, machine machine.MachineState, prefix string, unitCount int) {
for i := 1; i <= unitCount; i++ {
func appendJobsForTests(jobs *[]job.Job, machine machine.MachineState, prefix string, unitCount int, template bool) {
if template {
// for start or load operations we may need to wait
// during the creation of units, and since this is a
// faked registry just set the 'Global' flag so we don't
// block forever
Options := []*schema.UnitOption{
&schema.UnitOption{
Section: "Unit",
Name: "Description",
Value: fmt.Sprintf("Template %[email protected]", prefix),
},
&schema.UnitOption{
Section: "X-Fleet",
Name: "Global",
Value: "true",
},
}
uf := schema.MapSchemaUnitOptionsToUnitFile(Options)
j := job.Job{
Name: fmt.Sprintf("%s%d.service", prefix, i),
Unit: unit.UnitFile{},
Name: fmt.Sprintf("%s@.service", prefix),
Unit: *uf,
TargetMachineID: machine.ID,
}
*jobs = append(*jobs, j)
} else {
for i := 1; i <= unitCount; i++ {
j := job.Job{
Name: fmt.Sprintf("%s%d.service", prefix, i),
Unit: unit.UnitFile{},
TargetMachineID: machine.ID,
}
*jobs = append(*jobs, j)
}
}

return
Expand Down Expand Up @@ -181,6 +221,37 @@ func TestUnitNameMangle(t *testing.T) {
}
}

func TestGetBlockAttempts(t *testing.T) {
oldNoBlock := sharedFlags.NoBlock
oldBlockAttempts := sharedFlags.BlockAttempts

defer func() {
sharedFlags.NoBlock = oldNoBlock
sharedFlags.BlockAttempts = oldBlockAttempts
}()

var blocktests = []struct {
noBlock bool
blockAttempts int
expected int
}{
{true, 0, -1},
{true, -1, -1},
{true, 9999, -1},
{false, 0, 0},
{false, -1, 0},
{false, 9999, 9999},
}

for _, tt := range blocktests {
sharedFlags.NoBlock = tt.noBlock
sharedFlags.BlockAttempts = tt.blockAttempts
if n := getBlockAttempts(); n != tt.expected {
t.Errorf("got %d, want %d", n, tt.expected)
}
}
}

func newUnitFile(t *testing.T, contents string) *unit.UnitFile {
uf, err := unit.NewUnitFile(contents)
if err != nil {
Expand Down
12 changes: 1 addition & 11 deletions fleetctl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,7 @@ func runLoadUnits(args []string) (exit int) {
}
}

if !sharedFlags.NoBlock {
errchan := waitForUnitStates(loading, job.JobStateLoaded, sharedFlags.BlockAttempts, os.Stdout)
for err := range errchan {
stderr("Error waiting for units: %v", err)
exit = 1
}
} else {
for _, name := range loading {
stdout("Triggered unit %s load", name)
}
}
exit = tryWaitForUnitStates(loading, "load", job.JobStateLoaded, getBlockAttempts(), os.Stdout)

return
}
Loading