Skip to content

Commit

Permalink
Merge pull request #2673 from LiilyZhang/zhangl/Issue2554
Browse files Browse the repository at this point in the history
Issue 2554 - Agent does not get MMS model when 2 agbots overwrite eac…
  • Loading branch information
dabooz authored Aug 6, 2021
2 parents acd1541 + b4480ae commit 71fe09b
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 89 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ AGBOT_REGISTRY ?= $(DOCKER_REGISTRY)
# The CSS and its production container. This container is NOT used by hzn dev.
CSS_EXECUTABLE := css/cloud-sync-service
CSS_CONTAINER_DIR := css
CSS_IMAGE_VERSION ?= 1.6.5$(BRANCH_NAME)
CSS_IMAGE_VERSION ?= 1.6.6$(BRANCH_NAME)
CSS_IMAGE_BASE = image/cloud-sync-service
CSS_IMAGE_NAME = $(IMAGE_REPO)/$(arch)_cloud-sync-service
CSS_IMAGE = $(CSS_IMAGE_NAME):$(CSS_IMAGE_VERSION)
Expand All @@ -100,7 +100,7 @@ CSS_IMAGE_LABELS ?= --label "name=$(arch)_cloud-sync-service" --label "version=$
# The hzn dev ESS/CSS and its container.
ESS_EXECUTABLE := ess/edge-sync-service
ESS_CONTAINER_DIR := ess
ESS_IMAGE_VERSION ?= 1.6.5$(BRANCH_NAME)
ESS_IMAGE_VERSION ?= 1.6.6$(BRANCH_NAME)
ESS_IMAGE_BASE = image/edge-sync-service
ESS_IMAGE_NAME = $(IMAGE_REPO)/$(arch)_edge-sync-service
ESS_IMAGE = $(ESS_IMAGE_NAME):$(ESS_IMAGE_VERSION)
Expand Down
6 changes: 4 additions & 2 deletions agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,10 +1072,12 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,

objPolicies := b.mmsObjMgr.GetObjectPolicies(agreement.Org, serviceNamePieces[0], serviceNamePieces[2], serviceNamePieces[1])

if _, err := AssignObjectToNode(b, objPolicies, agreement.DeviceId, nodePolicy, false); err != nil {
destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0)
if addedToList, _, err := AssignObjectToNodes(b, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, false); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to assign object(s) to node %v, error %v", agreement.DeviceId, err)))
} else if addedToList {
AddDestinationsForObjects(b, destsToAddMap)
}

}
} else if b.GetCSSURL() == "" {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to evaluate object placement because there is no CSS URL configured in this agbot")))
Expand Down
168 changes: 97 additions & 71 deletions agreementbot/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@ import (
"strings"
)


// This function is called when an object is ready to be deployed to a node. It will perform the policy compatibility test
// if necessary and will then update the object's destination list in the CSS.
func AssignObjectToNode(ec exchange.ExchangeContext, objPolicies *exchange.ObjectDestinationPolicies, nodeId string, nodePolicy *policy.Policy, knownCompatible bool) (bool, error) {

func AssignObjectToNodes(ec exchange.ExchangeContext, objPolicies *exchange.ObjectDestinationPolicies, nodeId string, nodePolicy *policy.Policy, destsToAddMap map[string]*exchange.ObjectDestinationsToAdd, knownCompatible bool) (bool, map[string]*exchange.ObjectDestinationsToAdd, error) {
if len(*objPolicies) == 0 {
return false, nil
return false, destsToAddMap, nil
}

updateDestHandler := exchange.GetHTTPUpdateObjectDestinationHandler(ec)
getObjectHandler := exchange.GetHTTPObjectQueryHandler(ec)
objDestHandler := exchange.GetHTTPObjectDestinationQueryHandler(ec)

currentObjDestinations := new(exchange.ObjectDestinationStatuses)

// For each object policy received, make sure the object is still valid, evaluate it against the node policy if necessary,
// and then update the object's destination list.
Expand Down Expand Up @@ -60,87 +56,97 @@ func AssignObjectToNode(ec exchange.ExchangeContext, objPolicies *exchange.Objec
// Check if node and model polices are compatible. Incompatible policies are not necessarily an error so just log a warning and return.
if err := policy.Are_Compatible(nodePolicy, internalObjPol, nil); err != nil {
glog.Warningf(opLogstring(fmt.Sprintf("error matching node policy %v and object policy %v, error: %v", nodePolicy, internalObjPol, err)))
return false, nil
return false, destsToAddMap, nil
} else {
glog.V(3).Infof(opLogstring(fmt.Sprintf("node %v is compatible with object %v/%v with type %v", nodeId, objPol.OrgID, objPol.ObjectID, objPol.ObjectType)))
}
}

// Grab the current destinations of the object.
if dests, err := objDestHandler(objPol.OrgID, objPol.ObjectID, objPol.ObjectType); err != nil {
glog.Errorf(opLogstring(fmt.Sprintf("error reading object %v %v %v destinations, %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, err)))
} else if dests != nil {
currentObjDestinations = dests
}
// Policies are compatible so add this node to destination list for the object.
dest := "openhorizon.edgenode:" + exchange.GetId(nodeId)
glog.V(5).Infof(opLogstring(fmt.Sprintf("adding node %v to destination list for object %v:%v:%v", dest, objPol.OrgID, objPol.ObjectType, objPol.ObjectID)))

// Policies are compatible so place this object on the node. If the node is not in
// the destination list of the object, add it.
pdlr := new(exchange.PutDestinationListRequest)
found := false
for _, destStatus := range *currentObjDestinations {
if destStatus.DestID == exchange.GetId(nodeId) {
// Found it, no need to update the destination list.
found = true
break
} else {
// The destination list update is a full replace so we have to capture all the current destinations as
// we iterate the current list.
(*pdlr) = append((*pdlr), destStatus.DestType+":"+destStatus.DestID)
}
}

if !found {
(*pdlr) = append((*pdlr), "openhorizon.edgenode:"+exchange.GetId(nodeId))

// The update could fail if the object has been deleted in this small window.
if err := updateDestHandler(objPol.OrgID, &objPol, pdlr); err != nil {
glog.Warningf(opLogstring(fmt.Sprintf("failed to update object %v %v %v destination list, error %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, err)))
} else {
glog.V(3).Infof(opLogstring(fmt.Sprintf("updated destination list for object %v of type %v with node %v", objPol.ObjectID, objPol.ObjectType, nodeId)))
}
objKey := getObjectKey(objPol.OrgID, objPol.ObjectType, objPol.ObjectID)
if _, ok := destsToAddMap[objKey]; !ok {
destsToAdd := new(exchange.ObjectDestinationsToAdd)
(*destsToAdd) = append((*destsToAdd), dest)
destsToAddMap[objKey] = destsToAdd
} else {
glog.V(5).Infof(opLogstring(fmt.Sprintf("node %v is already a destination for object %v with type %v", nodeId, objPol.ObjectID, objPol.ObjectType)))
destsToAdd := destsToAddMap[objKey]
(*destsToAdd) = append((*destsToAdd), dest)
}

}
return true, nil
return true, destsToAddMap, nil
}

// This function is called to remove an object from a node. It is assumed that the caller has already done the
// policy compatibility check.
func UnassignObjectFromNode(ec exchange.ExchangeContext, objPol *exchange.ObjectDestinationPolicy, nodeId string) error {
func UnassignObjectFromNodes(ec exchange.ExchangeContext, objPol *exchange.ObjectDestinationPolicy, nodeId string, destsToDeleteMap map[string]*exchange.ObjectDestinationsToDelete) error {

glog.V(5).Infof(opLogstring(fmt.Sprintf("unassign object %v %v %v from node %v", objPol.OrgID, objPol.ObjectType, objPol.ObjectID, nodeId)))
glog.V(5).Infof(opLogstring(fmt.Sprintf("removing node %v from destination list for object %v:%v:%v", nodeId, objPol.OrgID, objPol.ObjectType, objPol.ObjectID)))

updateDestHandler := exchange.GetHTTPUpdateObjectDestinationHandler(ec)
getObjectHandler := exchange.GetHTTPObjectQueryHandler(ec)
pdlr := new(exchange.PutDestinationListRequest)
dest := "openhorizon.edgenode:" + exchange.GetId(nodeId)
objKey := getObjectKey(objPol.OrgID, objPol.ObjectType, objPol.ObjectID)
if _, ok := destsToDeleteMap[objKey]; !ok {
destsToDelete := new(exchange.ObjectDestinationsToDelete)
(*destsToDelete) = append((*destsToDelete), dest)
destsToDeleteMap[objKey] = destsToDelete
} else {
destsToDelete := destsToDeleteMap[objKey]
(*destsToDelete) = append((*destsToDelete), dest)
}

found := false
for _, destStatus := range objPol.Destinations {
if destStatus.DestID == exchange.GetId(nodeId) {
found = true
return nil
}

func AddDestinationsForObjects(ec exchange.ExchangeContext, destsToAddMap map[string]*exchange.ObjectDestinationsToAdd) {
glog.V(3).Infof(opLogstring(fmt.Sprintf("Start to call CSS to add destinations")))
postDestHandler := exchange.GetHTTPAddOrRemoveObjectDestinationHandler(ec)
for key, destsToAdd := range destsToAddMap {
objOrg, objType, objID := extractObjectKey(key)
glog.V(3).Infof(opLogstring(fmt.Sprintf("adding %d destinations %v for object %v of type %v", len(*destsToAdd), *destsToAdd, objID, objType)))

postDestRequest := exchange.PostDestsRequest{
Action: common.AddAction,
Destinations: *destsToAdd,
}

// The update could fail if the object has been deleted in this small window.
if err := postDestHandler(objOrg, objType, objID, &postDestRequest); err != nil {
glog.Warningf(opLogstring(fmt.Sprintf("failed to add destination(s) to object %v %v %v, error %v", objOrg, objType, objID, err)))
} else {
// The destination list update is a full replace so we have to capture all the current destinations as
// we iterate the current list.
(*pdlr) = append((*pdlr), destStatus.DestType+":"+destStatus.DestID)
glog.V(3).Infof(opLogstring(fmt.Sprintf("destinations added for object %v of type %v", objID, objType)))
}

}
}

func DeleteDestinationsForObjects(ec exchange.ExchangeContext, destsToDeleteMap map[string]*exchange.ObjectDestinationsToDelete) {
glog.V(3).Infof(opLogstring(fmt.Sprintf("Start to call CSS to delete destinations")))
postDestHandler := exchange.GetHTTPAddOrRemoveObjectDestinationHandler(ec)
getObjectHandler := exchange.GetHTTPObjectQueryHandler(ec)
for key, destsToDelete := range destsToDeleteMap {
objOrg, objType, objID := extractObjectKey(key)
glog.V(3).Infof(opLogstring(fmt.Sprintf("deleting %d destinations %v for object %v of type %v", len(*destsToDelete) , *destsToDelete, objID, objType)))

glog.V(5).Infof(opLogstring(fmt.Sprintf("new destination list %v", *pdlr)))
postDestRequest := exchange.PostDestsRequest{
Action: common.RemoveAction,
Destinations: *destsToDelete,
}

if found {
// The update could fail if the object has been deleted. That should be treated as an expected error.
if obj, err := getObjectHandler(objPol.OrgID, objPol.ObjectID, objPol.ObjectType); err != nil {
glog.Errorf(opLogstring(fmt.Sprintf("object %v %v %v destination cannot be updated, %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, err)))
// The update could fail if the object has been deleted. That should be treated as an expected error.
if obj, err := getObjectHandler(objOrg, objID, objType); err != nil {
glog.Errorf(opLogstring(fmt.Sprintf("object %v %v %v destination cannot be deleted, %v", objOrg, objType, objID, err)))
} else if obj == nil {
glog.Warningf(opLogstring(fmt.Sprintf("object %v %v %v has been deleted", objPol.OrgID, objPol.ObjectID, objPol.ObjectType)))
} else if err := updateDestHandler(objPol.OrgID, objPol, pdlr); err != nil {
glog.Errorf(opLogstring(fmt.Sprintf("%v", err)))
glog.Warningf(opLogstring(fmt.Sprintf("object %v %v %v has been deleted", objOrg, objType, objID)))
} else if err := postDestHandler(objOrg, objType, objID, &postDestRequest); err != nil {
glog.Warningf(opLogstring(fmt.Sprintf("failed to delete destination(s) to object %v %v %v, error %v", objOrg, objType, objID, err)))
} else {
glog.V(3).Infof(opLogstring(fmt.Sprintf("updated destination list for object %v/%v of type %v to remove node %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, nodeId)))
glog.V(3).Infof(opLogstring(fmt.Sprintf("destinations deleted for object %v of type %v", objID, objType)))
}

}
return nil
}

// MMS object policy changes can cause a significant impact to where objects are placed throughout the entire system.
Expand Down Expand Up @@ -246,6 +252,9 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,
objPolicies := new(exchange.ObjectDestinationPolicies)
(*objPolicies) = append((*objPolicies), newPolicy)

// Key: {objOrg}:{objType}:{objID}, value: list of destinations to add/delete
destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0)
destsToDeleteMap := make(map[string]*exchange.ObjectDestinationsToDelete, 0)
for _, agreement := range agreements {

// if the agreement is for a service that is compatible (including arch and version range) with a service in the new policy
Expand All @@ -260,16 +269,14 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,
} else {

// if agreement's node's policy is compatible with new object policy
// if agreement's node is NOT in current obj dest list, then
// add the agreement's node to object's destination list
added, err := AssignObjectToNode(w, objPolicies, agreement.DeviceId, nodePolicy, false)
// add the agreement's node to object's destination list to bulk add
addedToList, _, err := AssignObjectToNodes(w, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, false)
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err)))
} else if !added {
} else if !addedToList {
// else
// if agreement's node is in the object's destination list, then
// remove the agreement's node from obj destination list
err := UnassignObjectFromNode(w, &newPolicy, agreement.DeviceId)
// add the agreement's node to destination list to bulk delete
err := UnassignObjectFromNodes(w, &newPolicy, agreement.DeviceId, destsToDeleteMap)
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err)))
}
Expand Down Expand Up @@ -304,7 +311,7 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,
// if none of them are in new policy then
if !hasRunningService(ns.RunningServices, &newPolicy, workerId, w.config.ArchSynonyms) {
// remove the node from the dest list
err := UnassignObjectFromNode(w, &newPolicy, agreement.DeviceId)
err := UnassignObjectFromNodes(w, &newPolicy, agreement.DeviceId, destsToDeleteMap)
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err)))
}
Expand All @@ -331,6 +338,15 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,

}

// bulk Add and Remove destinations
if len(destsToAddMap) != 0 {
AddDestinationsForObjects(w, destsToAddMap)
}

if len(destsToDeleteMap) != 0 {
DeleteDestinationsForObjects(w, destsToDeleteMap)
}

glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("done with MMS Object Policy event: %v", wi)))

return
Expand Down Expand Up @@ -464,7 +480,17 @@ func hasRunningService(allRunningServices string, newPolicy *exchange.ObjectDest
return false
}

func getObjectKey(objOrg string, objType string, objID string) string {
return fmt.Sprintf("%v:%v:%v", objOrg, objType, objID)
}

func extractObjectKey(objKey string) (string, string, string) {
parts := strings.Split(objKey, ":")
return parts[0], parts[1], parts[2]
}

// =============================================================================================================
var opLogstring = func(v interface{}) string {
return fmt.Sprintf("Object Policy: %v", v)
}

25 changes: 17 additions & 8 deletions exchange/css.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,21 @@ func (d ObjectDestinationPolicy) String() string {
return fmt.Sprintf("Object Destination Policy: Org %v, Type %v, ID %v, %v, Destinations %v", d.OrgID, d.ObjectType, d.ObjectID, d.DestinationPolicy, d.Destinations)
}

type PostDestsRequest struct {
// Action is "add" or "remove"
Action string `json:"action"`

// Destinations is an array of destinations, each entry is an string in form of "<destinationType>:<destinationID>"
Destinations []string `json:"destinations"`
}

type ObjectDestinationPolicies []ObjectDestinationPolicy

type ObjectDestinationStatuses []common.DestinationsStatus

type PutDestinationListRequest []string
type ObjectDestinationsToAdd []string

type ObjectDestinationsToDelete []string

// Query the CSS to retrieve object policy for a given service id.
func GetObjectsByService(ec ExchangeContext, org string, serviceId string) (*ObjectDestinationPolicies, error) {
Expand Down Expand Up @@ -140,19 +150,19 @@ func GetUpdatedObjects(ec ExchangeContext, org string, since int64) (*ObjectDest
}
}

// Update the destination list of the object when that object's policy enables it to be placed on the node.
func UpdateObjectDestinationList(ec ExchangeContext, org string, objPol *ObjectDestinationPolicy, dests *PutDestinationListRequest) error {

// Add or Remove the destinations of the object when that object's policy enables it to be placed on the node.
func AddOrRemoveDestinations(ec ExchangeContext, org string, objType string, objID string, postDestsRequest *PostDestsRequest) error {
// There is no response to CSS API.
var resp interface{}

url := path.Join("/api/v1/objects", org, objPol.ObjectType, objPol.ObjectID, "destinations")
url := path.Join("/api/v1/objects", org, objType, objID, "destinations")
url = ec.GetCSSURL() + url

retryCount := ec.GetHTTPFactory().RetryCount
retryInterval := ec.GetHTTPFactory().GetRetryInterval()

for {
if err, tpErr := InvokeExchange(ec.GetHTTPFactory().NewHTTPClient(nil), "PUT", url, ec.GetExchangeId(), ec.GetExchangeToken(), dests, &resp); err != nil {
if err, tpErr := InvokeExchange(ec.GetHTTPFactory().NewHTTPClient(nil), "POST", url, ec.GetExchangeId(), ec.GetExchangeToken(), postDestsRequest, &resp); err != nil {
glog.Errorf(rpclogString(fmt.Sprintf(err.Error())))
return err
} else if tpErr != nil {
Expand All @@ -168,11 +178,10 @@ func UpdateObjectDestinationList(ec ExchangeContext, org string, objPol *ObjectD
continue
}
} else {
glog.V(5).Infof(rpclogString(fmt.Sprintf("updated destination list for object %v of type %v with %v", objPol.ObjectID, objPol.ObjectType, dests)))
glog.V(5).Infof(rpclogString(fmt.Sprintf("%s destinations for object %v of type %v with %v", postDestsRequest.Action, objID, objType, postDestsRequest.Destinations)))
return nil
}
}

}

// Get the object's metadata.
Expand Down
10 changes: 5 additions & 5 deletions exchange/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,12 @@ func GetHTTPObjectDestinationQueryHandler(ec ExchangeContext) ObjectDestinationQ
}
}

// A handler for updating the list of object destinations in the Model Management System.
type UpdateObjectDestinationHandler func(org string, objPol *ObjectDestinationPolicy, dests *PutDestinationListRequest) error
// A handler for add or delete the object destinations in the Model Management System.
type AddOrRemoveObjectDestinationHandler func(org string, objType string, objID string, destsRequest *PostDestsRequest) error

func GetHTTPUpdateObjectDestinationHandler(ec ExchangeContext) UpdateObjectDestinationHandler {
return func(org string, objPol *ObjectDestinationPolicy, dests *PutDestinationListRequest) error {
return UpdateObjectDestinationList(ec, org, objPol, dests)
func GetHTTPAddOrRemoveObjectDestinationHandler(ec ExchangeContext) AddOrRemoveObjectDestinationHandler {
return func(org string, objType string, objID string, destsRequest *PostDestsRequest) error {
return AddOrRemoveDestinations(ec, org, objType, objID, destsRequest)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/mibk/dupl v1.0.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/open-horizon/edge-sync-service v1.6.4
github.com/open-horizon/edge-sync-service v1.6.5
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152
github.com/open-horizon/rsapss-tool v0.0.0-20190416131035-2fc75eb3b6ea
github.com/opencontainers/image-spec v1.0.2-0.20181011182654-b6e51fa50549 // indirect
Expand Down

0 comments on commit 71fe09b

Please sign in to comment.