Skip to content

Commit

Permalink
registry: remove units from etcd registry upon DestroyUnit()
Browse files Browse the repository at this point in the history
So far each command "fleetctl destroy unit" has removed job entries from
the etcd registry, under /_coreos.com/fleet/job. But it has not removed
its unit file, under /_coreos.com/fleet/unit. As a result, fleet left
lots of garbages in the etcd registry, so users had to manually clean
them up.

So this patch gets unit contents deleted actually from etcd registry
when DestroyUnit() gets called. To avoid potential hash collisions,
it first fetches a list of units from registry, to check there's any
duplicated entry. Only if no duplicated unit is found, fleetd actually
deletes the unit from registry.

Fixes: coreos#1456
Fixes: coreos#1290
Reference: coreos#1291
  • Loading branch information
Dongsu Park committed Apr 19, 2016
1 parent e03f384 commit 24c31d5
Showing 1 changed file with 119 additions and 1 deletion.
120 changes: 119 additions & 1 deletion registry/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"errors"
"fmt"
"path"
"reflect"
"sort"
"strings"

etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client"

Expand Down Expand Up @@ -284,6 +286,48 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc

}

// getUnitFromPlainNode() takes a *etcd.Node containing a Unit's jobModel, and
// instantiates and returns a representative *job.Unit, transitively fetching
// the associated UnitFile as necessary. In contrast to getUnitFromObjectNode(),
// this does not use not.Value itself as a hash key, but it uses the last part
// of node.Key for the hash key.
func (r *EtcdRegistry) getUnitFromPlainNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) {
var err error
var jm jobModel

if err = unmarshal(node.Value, &jm); err != nil {
return nil, err
}

parts := strings.Split(node.Key, "/")
if len(parts) == 0 {
log.Errorf("key '%v' doesn't have enough parts", node.Key)
return nil, nil
}
stringHash := parts[len(parts)-1]

hashKey, err := unit.HashFromHexString(stringHash)
if err != nil {
log.Errorf("cannot convert key string into hash. %v", err)
return nil, nil
}

var unit *unit.UnitFile

unit = unitHashLookupFunc(hashKey)
if unit == nil {
log.Warningf("No Unit found in Registry for Job(%s)", jm.Name)
return nil, nil
}

ju := &job.Unit{
Name: jm.Name,
Unit: *unit,
}
return ju, nil

}

// jobModel is used for serializing and deserializing Jobs stored in the Registry
type jobModel struct {
Name string
Expand All @@ -297,7 +341,12 @@ func (r *EtcdRegistry) DestroyUnit(name string) error {
opts := &etcd.DeleteOptions{
Recursive: true,
}
_, err := r.kAPI.Delete(r.ctx(), key, opts)
u, err := r.Unit(name)
if err != nil {
log.Warningf("r.Unit error, name=%s\n", name)
u = nil
}
_, err = r.kAPI.Delete(r.ctx(), key, opts)
if err != nil {
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
err = errors.New("job does not exist")
Expand All @@ -307,9 +356,78 @@ func (r *EtcdRegistry) DestroyUnit(name string) error {
}

// TODO(jonboulle): add unit reference counting and actually destroying Units

// Delete unit from the etcd registry
if u != nil {
// check if the unit is really valid. If not, return err.
key = r.hashedUnitPath(u.Unit.Hash())
unitMatch, err := r.checkUnitMatch(u)
if err != nil {
return fmt.Errorf("Failed checking unit validity")
}
if unitMatch {
return fmt.Errorf("Invalid unit in the etcd registry: not deleting from registry.")
}
_, err = r.kAPI.Delete(r.ctx(), key, opts)
if err != nil {
// NOTE: unable to delete the key, but it's practically no big deal,
// as the garbage will be later cleaned up on the etcd side.
return fmt.Errorf("Failed deleting unit from registry: %s", err)
}
}
return nil
}

// checkUnitMatch() determines if the given unit is a really valid entry in the
// etcd registry, by querying the entries via RPC.
func (r *EtcdRegistry) checkUnitMatch(unitDel *job.Unit) (unitMatch bool, err error) {
key := path.Join(r.keyPrefix, unitPrefix)
opts := &etcd.GetOptions{
Recursive: true,
}
res, err := r.kAPI.Get(r.ctx(), key, opts)
if err != nil {
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
err = nil
}
return false, err
}

return r.checkUnitSiblings(unitDel, res.Node)
}

// checkUnitSiblings() returns true if there's a duplicated entry already in
// the etcd registry.
func (r *EtcdRegistry) checkUnitSiblings(unitDel *job.Unit, dir *etcd.Node) (bool, error) {
uhashKey := dir.Key
unitDelName := r.hashedUnitPath(unitDel.Unit.Hash())
for _, uhashNode := range dir.Nodes {
newUnit, err := r.getUnitFromPlainNode(uhashNode, r.getUnitByHash)
if err != nil {
log.Errorf("cannot get unit. err: %v", err)
return false, err
}
if newUnit == nil {
log.Debugf("unable to parse Unit in Registry at key %s", uhashKey)
continue
}

if unitDelName == uhashNode.Key {
log.Debugf("skipping the entry itself.")
continue
}

if reflect.DeepEqual(unitDel.Unit.Contents, newUnit.Unit.Contents) {
// matched. so this unit has a duplicated entry, so return
log.Debugf("won't erase this key, as a duplicated entry is found.")
return true, nil
}
}

log.Debugf("no matching entry, so it can be removed.")
return false, nil
}

// CreateUnit attempts to store a Unit and its associated unit file in the registry
func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) {
if err := r.storeOrGetUnitFile(u.Unit); err != nil {
Expand Down

0 comments on commit 24c31d5

Please sign in to comment.