Skip to content

Commit

Permalink
Merge pull request #2898 from dlarson04/issue2879
Browse files Browse the repository at this point in the history
Issue 2879 - assess each object manager policy
  • Loading branch information
linggao authored Nov 1, 2021
2 parents 2092062 + fbf0fa1 commit 811a1fe
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 60 deletions.
12 changes: 10 additions & 2 deletions agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,11 +1077,19 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,
objPolicies := b.mmsObjMgr.GetObjectPolicies(agreement.Org, serviceNamePieces[0], serviceNamePieces[2], serviceNamePieces[1])

destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0)
if addedToList, _, err := AssignObjectToNodes(b, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, false); err != nil {
destsToDeleteMap := make(map[string]*exchange.ObjectDestinationsToDelete, 0)

if err := AssignObjectToNodes(b, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, destsToDeleteMap, nil, false); err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to assign object(s) to node %v, error %v", agreement.DeviceId, err)))
} else if addedToList {
}

if len(destsToAddMap) > 0 {
AddDestinationsForObjects(b, destsToAddMap)
}
if len(destsToDeleteMap) > 0 {
DeleteDestinationsForObjects(b, destsToDeleteMap)
}

}
} else if b.GetCSSURL() == "" {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to evaluate object placement because there is no CSS URL configured in this agbot")))
Expand Down
158 changes: 124 additions & 34 deletions agreementbot/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,60 @@ import (
"strings"
)

const DESTINATIONS_LOG_LIMIT = 50

// This function is called when an object is ready to be deployed to a node. It will perform the policy compatibility test
// if necessary and will then update the object's destination list in the CSS.
func AssignObjectToNodes(ec exchange.ExchangeContext, objPolicies *exchange.ObjectDestinationPolicies, nodeId string, nodePolicy *policy.Policy, destsToAddMap map[string]*exchange.ObjectDestinationsToAdd, knownCompatible bool) (bool, map[string]*exchange.ObjectDestinationsToAdd, error) {
// if necessary and will then update the object's destination list (either adding or removing) in the CSS. If node is
// appropriately in or out of the destination map already, no action will be taken.
func AssignObjectToNodes(ec exchange.ExchangeContext,
objPolicies *exchange.ObjectDestinationPolicies,
nodeId string,
nodePolicy *policy.Policy,
destsToAddMap map[string]*exchange.ObjectDestinationsToAdd,
destsToDeleteMap map[string]*exchange.ObjectDestinationsToDelete,
destNodesMap map[string][]string,
knownCompatible bool) error {

if len(*objPolicies) == 0 {
return false, destsToAddMap, nil
return nil
}

getObjectHandler := exchange.GetHTTPObjectQueryHandler(ec)
getObjDestHandler := exchange.GetHTTPObjectDestinationQueryHandler(ec)

// For each object policy received, make sure the object is still valid, evaluate it against the node policy if necessary,
// and then update the object's destination list.
for _, objPol := range *objPolicies {

objectKey := getObjectKey(objPol.OrgID, objPol.ObjectType, objPol.ObjectID)

if obj, err := getObjectHandler(objPol.OrgID, objPol.ObjectID, objPol.ObjectType); err != nil {
glog.Errorf(opLogstring(fmt.Sprintf("error reading object %v %v %v, %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, err)))
} else if obj == nil {
glog.Warningf(opLogstring(fmt.Sprintf("object %v %v %v has been deleted", objPol.OrgID, objPol.ObjectID, objPol.ObjectType)))
continue
}

currentDestNodes := make([]string, 0)

// When called from HandleMMSObjectPolicy, caller will pass in the destinations list to avoid retrieving it multiple times. Other callers will depend on this function to get the current list
if destNodesMap != nil {
currentDestNodes = destNodesMap[objectKey]
} else {

currentObjDestinations := new(exchange.ObjectDestinationStatuses)
// Grab the current destinations of the object.
if dests, err := getObjDestHandler(objPol.OrgID, objPol.ObjectID, objPol.ObjectType); err != nil {
glog.Errorf(opLogstring(fmt.Sprintf("error reading object %v %v %v destinations, %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, err)))
} else if dests != nil {
currentObjDestinations = dests
}
// Construct a list of destinations where the object currently lives
for _, destStatus := range *currentObjDestinations {
currentDestNodes = append(currentDestNodes, destStatus.DestID)
}
}

// The caller might have already done the compatibility test.
if !knownCompatible {
if glog.V(5) {
Expand All @@ -56,33 +90,48 @@ func AssignObjectToNodes(ec exchange.ExchangeContext, objPolicies *exchange.Obje
// properties plus service policy properties in the model policy properties.
nodePolicy.Constraints = []string{}

// Check if node and model polices are compatible. Incompatible policies are not necessarily an error so just log a warning and return.
// Check if node and model polices are compatible. Incompatible policies are not necessarily an error so just log a warning.
// If the node is in the destination list, the return code will indicate to remove it and then return.
if err := policy.Are_Compatible(nodePolicy, internalObjPol, nil); err != nil {
glog.Warningf(opLogstring(fmt.Sprintf("error matching node policy %v and object policy %v, error: %v", nodePolicy, internalObjPol, err)))
return false, destsToAddMap, nil

// If it was in the destination list, need to remove it
if cutil.SliceContains(currentDestNodes, exchange.GetId(nodeId)) {
UnassignObjectFromNodes(ec, &objPol, nodeId, destsToDeleteMap)
}

continue
} else {
glog.V(3).Infof(opLogstring(fmt.Sprintf("node %v is compatible with object %v/%v with type %v", nodeId, objPol.OrgID, objPol.ObjectID, objPol.ObjectType)))
}
}

// Policies are compatible so add this node to destination list for the object.
dest := "openhorizon.edgenode:" + exchange.GetId(nodeId)
if glog.V(5) {
glog.Infof(opLogstring(fmt.Sprintf("adding node %v to destination list for object %v:%v:%v", dest, objPol.OrgID, objPol.ObjectType, objPol.ObjectID)))
}
// Policies are compatible so add this node to destination list for the object if it is not currently there.
found := cutil.SliceContains(currentDestNodes, exchange.GetId(nodeId))

objKey := getObjectKey(objPol.OrgID, objPol.ObjectType, objPol.ObjectID)
if _, ok := destsToAddMap[objKey]; !ok {
destsToAdd := new(exchange.ObjectDestinationsToAdd)
(*destsToAdd) = append((*destsToAdd), dest)
destsToAddMap[objKey] = destsToAdd
if found {
if glog.V(5) {
glog.Infof(opLogstring(fmt.Sprintf("node %v already found in destination list", exchange.GetId(nodeId))))
}
} else {
destsToAdd := destsToAddMap[objKey]
(*destsToAdd) = append((*destsToAdd), dest)

dest := "openhorizon.edgenode:" + exchange.GetId(nodeId)
if glog.V(5) {
glog.Infof(opLogstring(fmt.Sprintf("adding node %v to destination list for object %v:%v:%v", dest, objPol.OrgID, objPol.ObjectType, objPol.ObjectID)))
}
if _, ok := destsToAddMap[objectKey]; !ok {
destsToAdd := new(exchange.ObjectDestinationsToAdd)
(*destsToAdd) = append((*destsToAdd), dest)
destsToAddMap[objectKey] = destsToAdd
} else {
destsToAdd := destsToAddMap[objectKey]
(*destsToAdd) = append((*destsToAdd), dest)
}
}

}
return true, destsToAddMap, nil

return nil
}

// This function is called to remove an object from a node. It is assumed that the caller has already done the
Expand All @@ -107,14 +156,42 @@ func UnassignObjectFromNodes(ec exchange.ExchangeContext, objPol *exchange.Objec
return nil
}

// Send a bulk add/delete to CSS if batch limit size reached
func (w *BaseAgreementWorker) CheckDestListBulkBatchSize(destsToAddMap map[string]*exchange.ObjectDestinationsToAdd, destsToDeleteMap map[string]*exchange.ObjectDestinationsToDelete, CSSDestinationBatchSize int) (map[string]*exchange.ObjectDestinationsToAdd, map[string]*exchange.ObjectDestinationsToDelete) {
// bulk Add and Remove destinations if limit reached
addDestLimitReached := false
for _, destsToAddTmp := range destsToAddMap {
if len(*destsToAddTmp) > CSSDestinationBatchSize {
addDestLimitReached = true
}
}
if addDestLimitReached {
AddDestinationsForObjects(w, destsToAddMap)
destsToAddMap = make(map[string]*exchange.ObjectDestinationsToAdd, 0)
}
deleteDestLimitReached := false
for _, destsToDeleteTmp := range destsToDeleteMap {
if len(*destsToDeleteTmp) > CSSDestinationBatchSize {
deleteDestLimitReached = true
}
}
if deleteDestLimitReached {
DeleteDestinationsForObjects(w, destsToDeleteMap)
destsToDeleteMap = make(map[string]*exchange.ObjectDestinationsToDelete, 0)
}

return destsToAddMap, destsToDeleteMap
}

func AddDestinationsForObjects(ec exchange.ExchangeContext, destsToAddMap map[string]*exchange.ObjectDestinationsToAdd) {
glog.V(3).Infof(opLogstring(fmt.Sprintf("Start to call CSS to add destinations")))
postDestHandler := exchange.GetHTTPAddOrRemoveObjectDestinationHandler(ec)
for key, destsToAdd := range destsToAddMap {
objOrg, objType, objID := extractObjectKey(key)
glog.V(3).Infof(opLogstring(fmt.Sprintf("adding %d destinations for object %v of type %v", len(*destsToAdd), objID, objType)))
if glog.V(3) && len(*destsToAdd) < 50 {
glog.Infof(opLogstring(fmt.Sprintf("Added destinations: %v", *destsToAdd)))
if len(*destsToAdd) < DESTINATIONS_LOG_LIMIT {
glog.V(3).Infof(opLogstring(fmt.Sprintf("adding %d destinations %v for object %v of type %v", len(*destsToAdd), *destsToAdd, objID, objType)))
} else {
glog.V(3).Infof(opLogstring(fmt.Sprintf("adding %d destinations %v... for object %v of type %v", len(*destsToAdd), (*destsToAdd)[:DESTINATIONS_LOG_LIMIT], objID, objType)))
}

postDestRequest := exchange.PostDestsRequest{
Expand All @@ -138,9 +215,10 @@ func DeleteDestinationsForObjects(ec exchange.ExchangeContext, destsToDeleteMap
getObjectHandler := exchange.GetHTTPObjectQueryHandler(ec)
for key, destsToDelete := range destsToDeleteMap {
objOrg, objType, objID := extractObjectKey(key)
glog.V(3).Infof(opLogstring(fmt.Sprintf("deleting %d destinations for object %v of type %v", len(*destsToDelete), objID, objType)))
if glog.V(3) && len(*destsToDelete) < 50 {
glog.Infof(opLogstring(fmt.Sprintf("Deleted destinations: %v", *destsToDelete)))
if len(*destsToDelete) < DESTINATIONS_LOG_LIMIT {
glog.V(3).Infof(opLogstring(fmt.Sprintf("deleting %d destinations %v for object %v of type %v", len(*destsToDelete), *destsToDelete, objID, objType)))
} else {
glog.V(3).Infof(opLogstring(fmt.Sprintf("deleting %d destinations %v... for object %v of type %v", len(*destsToDelete), (*destsToDelete)[:DESTINATIONS_LOG_LIMIT], objID, objType)))
}

postDestRequest := exchange.PostDestsRequest{
Expand Down Expand Up @@ -205,16 +283,28 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,
glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy NewPolicy: %v", newPolicy)))
}

CSSDestinationBatchSize := w.config.AgreementBot.CSSDestinationBatchSize

// Construct a list of destinations where the object currently lives. These will be in the policy update (the new policy).
destNodes := make([]string, 0, 5)
destNodes := make([]string, 0, len(newPolicy.Destinations))
for _, dest := range newPolicy.Destinations {
destNodes = append(destNodes, dest.DestID)
}

if glog.V(5) {
glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy current dest nodes: %v", destNodes)))
// Avoid building very long log messages
if len(destNodes) < DESTINATIONS_LOG_LIMIT {
glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy current dest nodes: %v", destNodes)))
} else {
glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy current dest nodes has length %v", len(destNodes))))
}
}

// Key: {objOrg}:{objType}:{objID}, value: list of current destinations
destNodesMap := make(map[string][]string, 1)
mapDestNodesKey := getObjectKey(newPolicy.OrgID, newPolicy.ObjectType, newPolicy.ObjectID)
destNodesMap[mapDestNodesKey] = destNodes

inProgress := func() persistence.AFilter {
return func(e persistence.Agreement) bool { return e.AgreementCreationTime != 0 && e.AgreementTimedout == 0 }
}
Expand Down Expand Up @@ -272,6 +362,7 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,
// Key: {objOrg}:{objType}:{objID}, value: list of destinations to add/delete
destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0)
destsToDeleteMap := make(map[string]*exchange.ObjectDestinationsToDelete, 0)

for _, agreement := range agreements {

// if the agreement is for a service that is compatible (including arch and version range) with a service in the new policy
Expand All @@ -287,18 +378,15 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,

// if agreement's node's policy is compatible with new object policy
// add the agreement's node to object's destination list to bulk add
addedToList, _, err := AssignObjectToNodes(w, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, false)
err := AssignObjectToNodes(w, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, destsToDeleteMap, destNodesMap, false)
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err)))
} else if !addedToList {
// else
// add the agreement's node to destination list to bulk delete
err := UnassignObjectFromNodes(w, &newPolicy, agreement.DeviceId, destsToDeleteMap)
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err)))
}
}
}

// bulk Add and Remove destinations if limit reached
destsToAddMap, destsToDeleteMap = w.CheckDestListBulkBatchSize(destsToAddMap, destsToDeleteMap, CSSDestinationBatchSize)

continue
}

Expand Down Expand Up @@ -332,6 +420,8 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler,
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err)))
}
// bulk Add and Remove destinations if limit reached
destsToAddMap, destsToDeleteMap = w.CheckDestListBulkBatchSize(destsToAddMap, destsToDeleteMap, CSSDestinationBatchSize)
} else {
// else
// nothing to do, assume that the agbot which owns the agreement for the other services will handle this same policy change event appropriately
Expand Down
25 changes: 14 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type AGConfig struct {
PolicySearchOrder bool // When true, search policies from most recently changed to least recently changed.
Vault VaultConfig // The hashicorp vault config to connect to and fetch secrets from.
SecretsUpdateCheck int // The number of seconds between checks for updated secrets.
CSSDestinationBatchSize int // The max number of destination updates to send to CSS in a single update.
}

// Contains the hashicorp vault configuration used within AGConfig.
Expand Down Expand Up @@ -370,16 +371,17 @@ func Read(file string) (*HorizonConfig, error) {
K8sCRInstallTimeoutS: K8sCRInstallTimeoutS_DEFAULT,
},
AgreementBot: AGConfig{
MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT,
AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT,
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT,
SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT,
MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT,
AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT,
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT,
SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT,
CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT,
},
}

Expand Down Expand Up @@ -554,6 +556,7 @@ func (agc *AGConfig) String() string {
", CheckUpdatedPolicyS: %v"+
", CSSURL: %v"+
", CSSSSLCert: %v"+
", CSSDestinationBatchSize: %v"+
", AgreementBatchSize: %v"+
", AgreementQueueSize: %v"+
", MessageQueueScale: %v"+
Expand All @@ -569,7 +572,7 @@ func (agc *AGConfig) String() string {
agc.IgnoreContractWithAttribs, agc.ExchangeURL, agc.ExchangeHeartbeat, agc.ExchangeId,
mask, agc.DVPrefix, agc.ActiveDeviceTimeoutS, agc.ExchangeMessageTTL, agc.MessageKeyPath, mask, agc.APIListen,
agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey,
agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.AgreementBatchSize,
agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize,
agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.MaxExchangeChanges,
agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault)
}
Expand Down
3 changes: 3 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,6 @@ const K8sCRInstallTimeoutS_DEFAULT = 180

// Time between secret update checks
const SecretsUpdateCheck_DEFAULT = 60

// Batch destination size to send to CSS
const AgbotCSSDestinationBatchSize_DEFAULT = 200
Loading

0 comments on commit 811a1fe

Please sign in to comment.