Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1679 from endocode/dongsu/fleetctl-capi-unitstate…
Browse files Browse the repository at this point in the history
…-single

api: support cAPI.UnitState() for a single unit
  • Loading branch information
Dongsu Park authored Nov 10, 2016
2 parents 2b2eda2 + 9137746 commit 97a7adb
Show file tree
Hide file tree
Showing 19 changed files with 536 additions and 104 deletions.
38 changes: 33 additions & 5 deletions api/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func wireUpStateResource(mux *http.ServeMux, prefix string, tokenLimit int, cAPI
base := path.Join(prefix, "state")
sr := stateResource{cAPI, base, uint16(tokenLimit)}
mux.Handle(base, &sr)
mux.Handle(base+"/", &sr)
}

type stateResource struct {
Expand All @@ -37,12 +38,23 @@ type stateResource struct {
}

func (sr *stateResource) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if req.Method != "GET" {
sendError(rw, http.StatusMethodNotAllowed, errors.New("only GET supported against this resource"))
return
if isCollectionPath(sr.basePath, req.URL.Path) {
switch req.Method {
case "GET":
sr.list(rw, req)
default:
sendError(rw, http.StatusMethodNotAllowed, errors.New("only GET supported against this resource"))
}
} else if item, ok := isItemPath(sr.basePath, req.URL.Path); ok {
switch req.Method {
case "GET":
sr.get(rw, req, item)
default:
sendError(rw, http.StatusMethodNotAllowed, errors.New("only GET supported against this resource"))
}
} else {
sendError(rw, http.StatusNotFound, nil)
}

sr.list(rw, req)
}

func (sr *stateResource) list(rw http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -77,6 +89,22 @@ func (sr *stateResource) list(rw http.ResponseWriter, req *http.Request) {
sendResponse(rw, http.StatusOK, &page)
}

func (sr *stateResource) get(rw http.ResponseWriter, req *http.Request, item string) {
us, err := sr.cAPI.UnitState(item)
if err != nil {
log.Errorf("Failed fetching UnitState(%s) from Registry: %v", item, err)
sendError(rw, http.StatusInternalServerError, nil)
return
}

if us == nil {
sendError(rw, http.StatusNotFound, errors.New("unit state does not exist"))
return
}

sendResponse(rw, http.StatusOK, *us)
}

func getUnitStatePage(cAPI client.API, machineID, unitName string, tok PageToken) (*schema.UnitStatePage, error) {
states, err := cAPI.UnitStates()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type API interface {

Unit(string) (*schema.Unit, error)
Units() ([]*schema.Unit, error)
UnitState(string) (*schema.UnitState, error)
UnitStates() ([]*schema.UnitState, error)

SetUnitTargetState(name, target string) error
Expand Down
8 changes: 8 additions & 0 deletions client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func (c *HTTPClient) UnitStates() ([]*schema.UnitState, error) {
return states, nil
}

func (c *HTTPClient) UnitState(name string) (*schema.UnitState, error) {
u, err := c.svc.UnitState.Get(name).Do()
if err != nil && !is404(err) {
return nil, err
}
return u, nil
}

func (c *HTTPClient) DestroyUnit(name string) error {
return c.svc.Units.Delete(name).Do()
}
Expand Down
9 changes: 9 additions & 0 deletions client/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ func (rc *RegistryClient) CreateUnit(u *schema.Unit) error {
return rc.Registry.CreateUnit(&rUnit)
}

func (rc *RegistryClient) UnitState(name string) (*schema.UnitState, error) {
rUnitState, err := rc.Registry.UnitState(name)
if err != nil {
return nil, err
}

return schema.MapUnitStateToSchemaUnitState(rUnitState), nil
}

func (rc *RegistryClient) UnitStates() ([]*schema.UnitState, error) {
rUnitStates, err := rc.Registry.UnitStates()
if err != nil {
Expand Down
178 changes: 80 additions & 98 deletions fleetctl/fleetctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ recommended to upgrade fleetctl to prevent incompatibility issues.
clientDriverEtcd = "etcd"

defaultEndpoint = "unix:///var/run/fleet.sock"
defaultSleepTime = 500 * time.Millisecond
defaultSleepTime = 2000 * time.Millisecond
)

var (
Expand Down Expand Up @@ -177,12 +177,6 @@ type Command struct {

}

type suState struct {
LoadState string
ActiveState string
SubState string
}

func getFlags(flagset *flag.FlagSet) (flags []*flag.Flag) {
flags = make([]*flag.Flag, 0)
flagset.VisitAll(func(f *flag.Flag) {
Expand Down Expand Up @@ -1012,63 +1006,64 @@ func waitForUnitStates(units []string, js job.JobState, maxAttempts int, out io.
func checkUnitState(name string, js job.JobState, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
defer wg.Done()

sleep := defaultSleepTime

if maxAttempts < 1 {
for {
if assertUnitState(name, js, out) {
return
}
time.Sleep(sleep)
}
} else {
for attempt := 0; attempt < maxAttempts; attempt++ {
if assertUnitState(name, js, out) {
return
}
time.Sleep(sleep)
}
errchan <- fmt.Errorf("timed out waiting for unit %s to report state %s", name, js)
}
}

func assertUnitState(name string, js job.JobState, out io.Writer) (ret bool) {
var state string
func assertUnitState(name string, js job.JobState, out io.Writer) bool {
fetchUnitState := func() error {
var state string

u, err := cAPI.Unit(name)
if err != nil {
log.Warningf("Error retrieving Unit(%s) from Registry: %v", name, err)
return
}
if u == nil {
log.Warningf("Unit %s not found", name)
return
}
u, err := cAPI.Unit(name)
if err != nil {
return fmt.Errorf("Error retrieving Unit(%s) from Registry: %v", name, err)
}
if u == nil {
return fmt.Errorf("Unit %s not found", name)
}

// If this is a global unit, CurrentState will never be set. Instead, wait for DesiredState.
if suToGlobal(*u) {
state = u.DesiredState
} else {
state = u.CurrentState
}
// If this is a global unit, CurrentState will never be set. Instead, wait for DesiredState.
if suToGlobal(*u) {
state = u.DesiredState
} else {
state = u.CurrentState
}

if job.JobState(state) != js {
log.Debugf("Waiting for Unit(%s) state(%s) to be %s", name, job.JobState(state), js)
return
}
if job.JobState(state) != js {
return fmt.Errorf("Waiting for Unit(%s) state(%s) to be %s", name, job.JobState(state), js)
}

ret = true
msg := fmt.Sprintf("Unit %s %s", name, u.CurrentState)
msg := fmt.Sprintf("Unit %s %s", name, u.CurrentState)

if u.MachineID != "" {
ms := cachedMachineState(u.MachineID)
if ms != nil {
msg = fmt.Sprintf("%s on %s", msg, machineFullLegend(*ms, false))
if u.MachineID != "" {
ms := cachedMachineState(u.MachineID)
if ms != nil {
msg = fmt.Sprintf("%s on %s", msg, machineFullLegend(*ms, false))
}
}

fmt.Fprintln(out, msg)
return nil
}
timeout, err := waitForState(fetchUnitState)
if err != nil {
log.Errorf("Failed to find unit %s within %v, err: %v", name, timeout, err)
return false
}

fmt.Fprintln(out, msg)
return
return true
}

// tryWaitForSystemdActiveState tries to wait for systemd units to reach an
Expand All @@ -1095,17 +1090,11 @@ func tryWaitForSystemdActiveState(units []string, maxAttempts int) (err error) {
// waitForSystemdActiveState tries to assert that the given unit becomes
// active, making use of multiple goroutines that check unit states.
func waitForSystemdActiveState(units []string, maxAttempts int) (errch chan error) {
apiStates, err := cAPI.UnitStates()
if err != nil {
errch <- fmt.Errorf("Error retrieving list of units: %v", err)
return
}

errchan := make(chan error)
var wg sync.WaitGroup
for _, name := range units {
wg.Add(1)
go checkSystemdActiveState(apiStates, name, maxAttempts, &wg, errchan)
go checkSystemdActiveState(name, maxAttempts, &wg, errchan)
}

go func() {
Expand All @@ -1116,7 +1105,7 @@ func waitForSystemdActiveState(units []string, maxAttempts int) (errch chan erro
return errchan
}

func checkSystemdActiveState(apiStates []*schema.UnitState, name string, maxAttempts int, wg *sync.WaitGroup, errchan chan error) {
func checkSystemdActiveState(name string, maxAttempts int, wg *sync.WaitGroup, errchan chan error) {
defer wg.Done()

// "isInf == true" means "blocking forever until it succeeded".
Expand All @@ -1128,7 +1117,7 @@ func checkSystemdActiveState(apiStates []*schema.UnitState, name string, maxAtte
}

for attempt := 0; attempt < maxAttempts; attempt++ {
if err := assertFetchSystemdActiveState(apiStates, name); err == nil {
if err := assertSystemdActiveState(name); err == nil {
return
} else {
errchan <- err
Expand All @@ -1140,64 +1129,34 @@ func checkSystemdActiveState(apiStates []*schema.UnitState, name string, maxAtte
}
}

func assertFetchSystemdActiveState(apiStates []*schema.UnitState, name string) error {
if err := assertSystemdActiveState(apiStates, name); err == nil {
return nil
}

// If the assertion failed, we again need to get unit states via cAPI,
// to retry the assertion repeatedly.
//
// NOTE: Ideally we should be able to fetch the state only for a single
// unit. However, we cannot do that for now, because cAPI.UnitState()
// is not available. In the future we need to implement cAPI.UnitState()
// and all dependendent parts all over the tree in fleet, (schema,
// etcdRegistry, rpcRegistry, etc) to replace UnitStates() in this place
// with the new method UnitState(). In practice, calling UnitStates() here
// is not as badly inefficient as it looks, because it will be anyway
// rarely called only when the assertion failed. - dpark 20160907

time.Sleep(defaultSleepTime)
// assertSystemdActiveState determines if a given systemd unit is actually
// in the active state, making use of cAPI.
// It repeatedly checks up to defaultSleepTimeout. If ActiveState of the given
// unit is active and LoadState of the given unit is loaded.
// If it cannot get the expected states within the period, return error.
func assertSystemdActiveState(unitName string) error {
fetchSystemdActiveState := func() error {
us, err := cAPI.UnitState(unitName)
if err != nil {
return fmt.Errorf("Error getting unit state of %s: %v", unitName, err)
}

var errU error
apiStates, errU = cAPI.UnitStates()
if errU != nil {
return fmt.Errorf("Error retrieving list of units: %v", errU)
// Get systemd state and check the state is active & loaded.
if us.SystemdActiveState != "active" || us.SystemdLoadState != "loaded" {
return fmt.Errorf("Failed to find an active unit %s", unitName)
}
return nil
}
return nil
}

// assertSystemdActiveState determines if a given systemd unit is actually
// in the active state, making use of cAPI
func assertSystemdActiveState(apiStates []*schema.UnitState, unitName string) error {
uState, err := getSingleUnitState(apiStates, unitName)
timeout, err := waitForState(fetchSystemdActiveState)
if err != nil {
return err
}

// Get systemd state and check the state is active & loaded.
if uState.ActiveState != "active" || uState.LoadState != "loaded" {
return fmt.Errorf("Failed to find an active unit %s", unitName)
return fmt.Errorf("Failed to find an active unit %s within %v, err: %v",
unitName, timeout, err)
}

return nil
}

// getSingleUnitState returns a single uState of type suState, which consists
// of necessary systemd states, only for a given unit name.
func getSingleUnitState(apiStates []*schema.UnitState, unitName string) (suState, error) {
for _, us := range apiStates {
if us.Name == unitName {
return suState{
us.SystemdLoadState,
us.SystemdActiveState,
us.SystemdSubState,
}, nil
}
}
return suState{}, fmt.Errorf("unit %s not found", unitName)
}

func machineState(machID string) (*machine.MachineState, error) {
machines, err := cAPI.Machines()
if err != nil {
Expand Down Expand Up @@ -1263,3 +1222,26 @@ func runWrapper(cf func(cCmd *cobra.Command, args []string) (exit int)) func(cCm
cmdExitCode = cf(cCmd, args)
}
}

// waitForState is a generic helper for repeatedly checking the status.
// It gets a generic function stateCheckFunc() to be checked, which returns
// nil on success, error otherwise. In case of failure, waitForState
// retries to run stateCheckFunc, once in 250 msec, up to defaultSleepTime.
func waitForState(stateCheckFunc func() error) (time.Duration, error) {
timeout := defaultSleepTime
alarm := time.After(timeout)
ticker := time.Tick(250 * time.Millisecond)

for {
select {
case <-alarm:
return timeout, fmt.Errorf("Failed to fetch systemd active states within %v", timeout)
case <-ticker:
err := stateCheckFunc()
if err == nil {
return timeout, nil
}
log.Debug("Retrying assertion of systemd active states. err: %v", err)
}
}
}
Loading

0 comments on commit 97a7adb

Please sign in to comment.