From b6b70ae87137ec485a70f4fa6594c7d8fdea3f41 Mon Sep 17 00:00:00 2001 From: Dongsu Park Date: Thu, 24 Mar 2016 10:15:38 +0100 Subject: [PATCH] registry: remove units from etcd registry upon DestroyUnit() So far each command "fleetctl destroy unit" has removed job entries from the etcd registry, under /_coreos.com/fleet/job. But it has not removed its unit file, under /_coreos.com/fleet/unit. As a result, fleet left lots of garbages in the etcd registry, so users had to manually clean them up. So this patch gets unit contents deleted actually from etcd registry when DestroyUnit() gets called. To avoid potential hash collisions, it first fetches a list of units from registry, to check there's any duplicated entry. Only if no duplicated unit is found, fleetd actually deletes the unit from registry. Fixes: https://github.com/coreos/fleet/issues/1456 Fixes: https://github.com/coreos/fleet/issues/1290 Reference: https://github.com/coreos/fleet/pull/1291 --- registry/job.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 1 deletion(-) diff --git a/registry/job.go b/registry/job.go index 53935f8f4..19634c028 100644 --- a/registry/job.go +++ b/registry/job.go @@ -18,7 +18,9 @@ import ( "errors" "fmt" "path" + "reflect" "sort" + "strings" etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client" @@ -284,6 +286,48 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc } +// getUnitFromPlainNode() takes a *etcd.Node containing a Unit's jobModel, and +// instantiates and returns a representative *job.Unit, transitively fetching +// the associated UnitFile as necessary. In contrast to getUnitFromObjectNode(), +// this does not use not.Value itself as a hash key, but it uses the last part +// of node.Key for the hash key. +func (r *EtcdRegistry) getUnitFromPlainNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) { + var err error + var jm jobModel + + if err = unmarshal(node.Value, &jm); err != nil { + return nil, err + } + + parts := strings.Split(node.Key, "/") + if len(parts) == 0 { + log.Errorf("key '%v' doesn't have enough parts", node.Key) + return nil, nil + } + stringHash := parts[len(parts)-1] + + hashKey, err := unit.HashFromHexString(stringHash) + if err != nil { + log.Errorf("cannot convert key string into hash. %v", err) + return nil, nil + } + + var unit *unit.UnitFile + + unit = unitHashLookupFunc(hashKey) + if unit == nil { + log.Warningf("No Unit found in Registry for Job(%s)", jm.Name) + return nil, nil + } + + ju := &job.Unit{ + Name: jm.Name, + Unit: *unit, + } + return ju, nil + +} + // jobModel is used for serializing and deserializing Jobs stored in the Registry type jobModel struct { Name string @@ -297,7 +341,12 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { opts := &etcd.DeleteOptions{ Recursive: true, } - _, err := r.kAPI.Delete(r.ctx(), key, opts) + u, err := r.Unit(name) + if err != nil { + log.Warningf("r.Unit error, name=%s\n", name) + u = nil + } + _, err = r.kAPI.Delete(r.ctx(), key, opts) if err != nil { if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { err = errors.New("job does not exist") @@ -307,9 +356,78 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { } // TODO(jonboulle): add unit reference counting and actually destroying Units + + // Delete unit from the etcd registry + if u != nil { + // check if the unit is really valid. If not, return err. + key = r.hashedUnitPath(u.Unit.Hash()) + unitMatch, err := r.checkUnitMatch(u) + if err != nil { + return fmt.Errorf("Failed checking unit validity") + } + if unitMatch { + return fmt.Errorf("Invalid unit in the etcd registry: not deleting from registry.") + } + _, err = r.kAPI.Delete(r.ctx(), key, opts) + if err != nil { + // NOTE: unable to delete the key, but it's practically no big deal, + // as the garbage will be later cleaned up on the etcd side. + return fmt.Errorf("Failed deleting unit from registry: %s", err) + } + } return nil } +// checkUnitMatch() determines if the given unit is a really valid entry in the +// etcd registry, by querying the entries via RPC. +func (r *EtcdRegistry) checkUnitMatch(unitDel *job.Unit) (unitMatch bool, err error) { + key := path.Join(r.keyPrefix, unitPrefix) + opts := &etcd.GetOptions{ + Recursive: true, + } + res, err := r.kAPI.Get(r.ctx(), key, opts) + if err != nil { + if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { + err = nil + } + return false, err + } + + return r.checkUnitSiblings(unitDel, res.Node) +} + +// checkUnitSiblings() returns true if there's a duplicated entry already in +// the etcd registry. +func (r *EtcdRegistry) checkUnitSiblings(unitDel *job.Unit, dir *etcd.Node) (bool, error) { + uhashKey := dir.Key + unitDelName := r.hashedUnitPath(unitDel.Unit.Hash()) + for _, uhashNode := range dir.Nodes { + newUnit, err := r.getUnitFromPlainNode(uhashNode, r.getUnitByHash) + if err != nil { + log.Errorf("cannot get unit. err: %v", err) + return false, err + } + if newUnit == nil { + log.Debugf("unable to parse Unit in Registry at key %s", uhashKey) + continue + } + + if unitDelName == uhashNode.Key { + log.Debugf("skipping the entry itself.") + continue + } + + if reflect.DeepEqual(unitDel.Unit.Contents, newUnit.Unit.Contents) { + // matched. so this unit has a duplicated entry, so return + log.Debugf("won't erase this key, as a duplicated entry is found.") + return true, nil + } + } + + log.Debugf("no matching entry, so it can be removed.") + return false, nil +} + // CreateUnit attempts to store a Unit and its associated unit file in the registry func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { if err := r.storeOrGetUnitFile(u.Unit); err != nil {