Skip to content

Commit

Permalink
Issue 2883 - AgentAutoUpgrade: Add node management worker in anax to …
Browse files Browse the repository at this point in the history
…handle nmps for the node

Signed-off-by: Max McAdam <[email protected]>
  • Loading branch information
MaxMcAdam committed Dec 1, 2021
1 parent e827337 commit 44cbd2e
Show file tree
Hide file tree
Showing 16 changed files with 788 additions and 40 deletions.
2 changes: 2 additions & 0 deletions changes/changes_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ func (w *ChangesWorker) findAndProcessChanges() {
}
} else if change.IsService() {
resourceTypes[events.CHANGE_SERVICE_TYPE] = true
} else if change.IsNMP() {
resourceTypes[events.CHANGE_NMP_TYPE] = true
} else {
glog.V(5).Infof(chglog(fmt.Sprintf("Unhandled change: %v %v/%v", change.Resource, change.OrgID, change.ID)))
}
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Config struct {
MaxAgreementPrelaunchTimeM int64 // The maximum numbers of minutes to wait for workload to start in an agreement
K8sCRInstallTimeoutS int64 // The number of seconds to wait for the custom resouce to install successfully before it is considered a failure
SecretsManagerFilePath string // The filepath for the secrets manager to store secrets in the agent filesystem
NodeMgmtWorkDirectory string // The filepath for the node management policy updates to use

// these Ids could be provided in config or discovered after startup by the system
BlockchainAccountId string
Expand Down Expand Up @@ -292,6 +293,13 @@ func (a *AGConfig) GetExchangeMessageTTL(maxHeartbeatInterval int) int {
return int(float64(hbInterval) * scaleFactor)
}

func (c *Config) GetNodeMgmtDirectory() string {
if c.NodeMgmtWorkDirectory == "" {
return fmt.Sprintf("%v/nmp", getDefaultBase())
}
return c.NodeMgmtWorkDirectory
}

func getDefaultBase() string {
basePath := os.Getenv("HZN_VAR_BASE")
if basePath == "" {
Expand Down
63 changes: 63 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ const (
OBJECT_POLICY_DELETED EventId = "OBJECT_POLICY_DELETED"
OBJECT_POLICIES_CHANGED EventId = "OBJECT_POLICIES_CHANGED"

// Node management policy
NMP_START_DOWNLOAD EventId = "NMP_START_DOWNLOAD"
NMP_DOWNLOAD_COMPLETE EventId = "NMP_DOWNLOAD_COMPLETE"

// Exchange change related
CHANGE_MESSAGE_TYPE EventId = "EXCHANGE_CHANGE_MESSAGE"
CHANGE_AGBOT_MESSAGE_TYPE EventId = "EXCHANGE_CHANGE_AGBOT_MESSAGE"
Expand All @@ -118,6 +122,7 @@ const (
CHANGE_AGBOT_PATTERN EventId = "EXCHANGE_CHANGE_AGBOT_PATTERN"
CHANGE_AGBOT_POLICY EventId = "EXCHANGE_CHANGE_AGBOT_POLICY"
CHANGE_AGBOT_AGREEMENT_TYPE EventId = "EXCHANGE_CHANGE_AGBOT_AGREEMENT"
CHANGE_NMP_TYPE EventId = "EXCHANGE_CHANGE_NODE_MANAGEMENT_POLICY"

// Secret related
UPDATED_SECRETS EventId = "SECRET_UPDATES"
Expand Down Expand Up @@ -2115,3 +2120,61 @@ func NewSecretUpdatesMessage(id EventId, sus *SecretUpdates) *SecretUpdatesMessa
Updates: *sus,
}
}

type NMPStartDownloadMessage struct {
event Event
Message StartDownloadMessage
}

type StartDownloadMessage struct {
FilesList []string
}

func (n *NMPStartDownloadMessage) Event() Event {
return n.event
}

func (n *NMPStartDownloadMessage) String() string {
return fmt.Sprintf("event: %v, Message: %v", n.event, n.Message)
}

func (n *NMPStartDownloadMessage) ShortString() string {
return n.String()
}

func NewNMPStartDownloadMessage(id EventId, message StartDownloadMessage) *NMPStartDownloadMessage {
return &NMPStartDownloadMessage{
event: Event{
Id: id,
},
Message: message,
}
}

type NMPDownloadCompleteMessage struct {
event Event
Success bool
NMPName string
}

func (n *NMPDownloadCompleteMessage) Event() Event {
return n.event
}

func (n *NMPDownloadCompleteMessage) String() string {
return fmt.Sprintf("event: %v, Success: %v", n.event, n.Success)
}

func (n *NMPDownloadCompleteMessage) ShortString() string {
return n.String()
}

func NewNMPDownloadCompleteMessage(id EventId, success bool, name string) *NMPDownloadCompleteMessage {
return &NMPDownloadCompleteMessage{
event: Event{
Id: id,
},
Success: success,
NMPName: name,
}
}
5 changes: 5 additions & 0 deletions exchange/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const RESOURCE_AGBOT_POLICY = "policy" // A policy change occu
const RESOURCE_AGBOT_SERVICE_POLICY = "servicepolicies" // A service policy changed
const RESOURCE_AGBOT_AGREEMENTS = "agbotagreements" // A change was made to one of the agreements on the agbot
const RESOURCE_ORG = "org" // A change was made to the org
const RESOURCE_NMP = "mgmtpolicy" // A change was made to a node management policy

// constants for operation values
const CHANGE_OPERATION_CREATED = "created"
Expand Down Expand Up @@ -138,6 +139,10 @@ func (e ExchangeChange) IsService() bool {
return e.Resource == RESOURCE_SERVICE
}

func (e ExchangeChange) IsNMP() bool {
return e.Resource == RESOURCE_NMP
}

func (e ExchangeChange) IsAgbotServedPolicy(agbot string) bool {
changeAgbot := fmt.Sprintf("%v/%v", e.OrgID, e.ID)
return changeAgbot == agbot && e.Resource == RESOURCE_AGBOT_SERVED_POLICY
Expand Down
14 changes: 10 additions & 4 deletions exchange/node_management_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ import (

const NMPExchangeResource = "managementpolicies"

type GetNMPResponse struct {
exchangecommon.ExchangeNodeManagementPolicy
LastUpdated string `json:"lastUpdated"`
Created string `json:"created"`
}

// Get the single node management policy from the exchange
func GetSingleExchangeNodeManagementPolicy(ec ExchangeContext, policyOrg string, policyName string) (*exchangecommon.ExchangeNodeManagementPolicy, error) {
glog.V(3).Infof("Getting node management policy : %v/%v", policyOrg, policyName)

var resp interface{}
resp = new(exchangecommon.ExchangeNodeManagementPolicy)
resp = new(GetNMPResponse)

targetURL := fmt.Sprintf("%vorgs/%v/%v/%v", ec.GetExchangeURL(), policyOrg, NMPExchangeResource, policyName)

Expand All @@ -22,12 +28,12 @@ func GetSingleExchangeNodeManagementPolicy(ec ExchangeContext, policyOrg string,
return nil, err
}

nmp := resp.(*exchangecommon.ExchangeNodeManagementPolicy)
return nmp, nil
nmp := resp.(GetNMPResponse).ExchangeNodeManagementPolicy
return &nmp, nil
}

type ExchangeNodeManagementPolicyResponse struct {
Policies map[string]exchangecommon.ExchangeNodeManagementPolicy `json:"managementPolicy"`
Policies map[string]exchangecommon.ExchangeNodeManagementPolicy `json:"managementPolicy,omitempty"`
LastIndex int `json:"lastIndex,omitempty"`
}

Expand Down
7 changes: 7 additions & 0 deletions exchange/node_management_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package exchange
import (
"fmt"
"github.com/golang/glog"
"github.com/open-horizon/anax/cutil"
"github.com/open-horizon/anax/exchangecommon"
)

Expand Down Expand Up @@ -31,6 +32,12 @@ func PutNodeManagementPolicyStatus(ec ExchangeContext, orgId string, nodeId stri
var resp interface{}
resp = new(PutPostDeleteStandardResponse)

org, name := cutil.SplitOrgSpecUrl(policyName)
if name == "" {
name = org
}
policyName = name

targetURL := fmt.Sprintf("%vorgs/%v/nodes/%v/managementStatus/%v", ec.GetExchangeURL(), orgId, nodeId, policyName)

err := InvokeExchangeRetryOnTransportError(ec.GetHTTPFactory(), "PUT", targetURL, ec.GetExchangeId(), ec.GetExchangeToken(), nmpStatus, &resp)
Expand Down
6 changes: 6 additions & 0 deletions exchange/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,12 @@ func InvokeExchange(httpClient *http.Client, method string, urlPath string, user
case *VaultSecretExistsResponse:
return nil, nil

case *GetNMPResponse:
return nil, nil

case *ExchangeNodeManagementPolicyResponse:
return nil, nil

default:
return errors.New(fmt.Sprintf("Unknown type of response object %v (%T) passed to invocation of %v at %v with %v", *resp, *resp, method, urlPath, requestBody)), nil
}
Expand Down
6 changes: 3 additions & 3 deletions exchangecommon/node_management_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ExchangeNodeManagementPolicy struct {
Properties externalpolicy.PropertyList `json:"properties"`
Patterns []string `json:"patterns"`
Enabled bool `json:"enabled"`
AgentAutoUpgradePolicy ExchangeAgentUpgradePolicy `json:"agentUpgradePolicy,omitempty"`
AgentAutoUpgradePolicy *ExchangeAgentUpgradePolicy `json:"agentUpgradePolicy,omitempty"`
LastUpdated string `json:"lastUpdated,omitempty"`
Created string `json:"created,omitempty"`
}
Expand Down Expand Up @@ -78,10 +78,10 @@ func (e *ExchangeNodeManagementPolicy) HasNoConstraints() bool {
}

func (e ExchangeNodeManagementPolicy) String() string {
return fmt.Sprintf("Owner: %v, Label: %v, Description: %v, Properties: %v, Constraints: %v, Patterns: %v, Enabled: %v, AgentAutoUpgradePolicy: %v, LastUpdated: %v",
return fmt.Sprintf("Owner: %v, Label: %v, Description: %v, Properties: %v, Constraints: %v, Patterns: %v, Enabled: %v, AgentAutoUpgradePolicy: %v, LastUpdated: %v, Created: %v",
e.Owner, e.Label, e.Description,
e.Properties, e.Constraints, e.Patterns,
e.Enabled, e.AgentAutoUpgradePolicy, e.LastUpdated)
e.Enabled, e.AgentAutoUpgradePolicy, e.LastUpdated, e.Created)
}

// The agent upgrade policy as stored in the exchange
Expand Down
93 changes: 85 additions & 8 deletions exchangecommon/node_management_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,95 @@ package exchangecommon

import (
"fmt"
"math/rand"
"time"
)

type NodeManagementPolicyStatus struct {
ScheduledTime string `json:"scheduledTime"`
ActualStartTime string `json:"startTime,omitempty"`
CompletionTime string `json:"endTime,omitempty"`
UpgradedVersion string `json:"upgradedVersion"`
Status string `json:"status"`
ErrorMessage string `json:"errorMessage,omitempty"`
AgentUpgrade *AgentUpgradePolicyStatus `json:"agentUpgrade"`
}

func (n NodeManagementPolicyStatus) String() string {
return fmt.Sprintf("ScheduledTime: %v, ActualStartTime: %v, CompletionTime: %v, UpgradedVersion: %v, Status: %v, ErrorMessage: %v",
n.ScheduledTime, n.ActualStartTime, n.CompletionTime, n.UpgradedVersion, n.Status, n.ErrorMessage)
return fmt.Sprintf("AgentUpgrade: %v", n.AgentUpgrade)
}

func (n NodeManagementPolicyStatus) Status() string {
if n.AgentUpgrade != nil {
return n.AgentUpgrade.Status
}
return ""
}

func (n NodeManagementPolicyStatus) SetStatus(status string) {
if n.AgentUpgrade != nil {
n.AgentUpgrade.Status = status
}
}

func (n NodeManagementPolicyStatus) SetErrorMessage(message string) {
if n.AgentUpgrade != nil {
n.AgentUpgrade.ErrorMessage = message
}
}

func (n NodeManagementPolicyStatus) SetCompletionTime(timeStr string) {
if n.AgentUpgrade != nil {
n.AgentUpgrade.CompletionTime = timeStr
}
}

func (n NodeManagementPolicyStatus) SetActualStartTime(timeStr string) {
if n.AgentUpgrade != nil {
n.AgentUpgrade.ActualStartTime = timeStr
}
}

type AgentUpgradePolicyStatus struct {
ScheduledTime string `json:"scheduledTime"`
scheduledUnixTime time.Time
ActualStartTime string `json:"startTime,omitempty"`
CompletionTime string `json:"endTime,omitempty"`
UpgradedVersion string `json:"upgradedVersion"`
Status string `json:"status"`
ErrorMessage string `json:"errorMessage,omitempty"`
BaseWorkingDirectory string `json:"workingDirectory"`
}

const (
STATUS_NEW = "waiting"
STATUS_UNKNOWN = "unknown"
STATUS_DOWNLOADED = "downloaded"
STATUS_DOWNLOAD_FAILED = "failed download"
STATUS_SUCCESSFUL = "successful"
STATUS_FAILED_JOB = "failed"
STATUS_INITIATED = "initiated"
)

func (a AgentUpgradePolicyStatus) String() string {
return fmt.Sprintf("ScheduledTime: %v, ActualStartTime: %v, CompletionTime: %v, UpgradedVersion: %v, Status: %v, ErrorMessage: %v, BaseWorkingDirectory: %v",
a.ScheduledTime, a.ActualStartTime, a.CompletionTime, a.UpgradedVersion, a.Status, a.ErrorMessage, a.BaseWorkingDirectory)
}

func StatusFromNewPolicy(policy ExchangeNodeManagementPolicy, workingDir string) NodeManagementPolicyStatus {
newStatus := NodeManagementPolicyStatus{
AgentUpgrade: &AgentUpgradePolicyStatus{Status: STATUS_NEW},
}
if policy.AgentAutoUpgradePolicy != nil {
startTime, _ := time.Parse(time.RFC3339, policy.AgentAutoUpgradePolicy.PolicyUpgradeTime)
realStartTime := startTime.Unix()
if policy.AgentAutoUpgradePolicy.UpgradeWindowDuration > 0 {
realStartTime = realStartTime + int64(rand.Intn(policy.AgentAutoUpgradePolicy.UpgradeWindowDuration))
}
newStatus.AgentUpgrade.ScheduledTime = time.Unix(realStartTime, 0).Format(time.RFC3339)
newStatus.AgentUpgrade.scheduledUnixTime = time.Unix(realStartTime, 0)
newStatus.AgentUpgrade.BaseWorkingDirectory = workingDir
}
return newStatus
}

func (n NodeManagementPolicyStatus) TimeToStart() bool {
if n.AgentUpgrade != nil {
return n.AgentUpgrade.scheduledUnixTime.Before(time.Now())
}
return false
}
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
_ "github.com/open-horizon/anax/i18n_messages"
"github.com/open-horizon/anax/imagefetch"
"github.com/open-horizon/anax/kube_operator"
_"github.com/open-horizon/anax/nodemanagement"
"github.com/open-horizon/anax/persistence"
"github.com/open-horizon/anax/policy"
"github.com/open-horizon/anax/resource"
Expand Down Expand Up @@ -189,6 +190,7 @@ func main() {
workers.Add(kube_operator.NewKubeWorker("Kube", cfg, db))
workers.Add(resource.NewResourceWorker("Resource", cfg, db, authm))
workers.Add(changes.NewChangesWorker("ExchangeChanges", cfg, db))
// workers.Add(nodemanagement.NewNodeManagementWorker("NodeManagement", cfg, db))
}

// Get into the event processing loop until anax shuts itself down.
Expand Down
54 changes: 54 additions & 0 deletions nodemanagement/commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package nodemanagement

import (
"fmt"
"github.com/open-horizon/anax/events"
)

type NodeRegisteredCommand struct {
Msg *events.EdgeRegisteredExchangeMessage
}

func (d NodeRegisteredCommand) ShortString() string {
return fmt.Sprintf("Msg: %v", d.Msg)
}

func (d NodeRegisteredCommand) String() string {
return d.ShortString()
}

func NewNodeRegisteredCommand(msg *events.EdgeRegisteredExchangeMessage) *NodeRegisteredCommand {
return &NodeRegisteredCommand{
Msg: msg,
}
}

type NMPDownloadCompleteCommand struct {
Msg *events.NMPDownloadCompleteMessage
}

func (n NMPDownloadCompleteCommand) String() string {
return fmt.Sprintf("Msg: %v", n.Msg)
}

func (n NMPDownloadCompleteCommand) ShortString() string {
return n.String()
}

func NewNMPDownloadCompleteCommand(msg *events.NMPDownloadCompleteMessage) *NMPDownloadCompleteCommand {
return &NMPDownloadCompleteCommand{Msg: msg}
}

type NodeShutdownCommand struct {
Msg *events.NodeShutdownMessage
}

func (n NodeShutdownCommand) ShortString() string {
return fmt.Sprintf("NodeShutdownCommand Msg: %v", n.Msg)
}

func NewNodeShutdownCommand(msg *events.NodeShutdownMessage) *NodeShutdownCommand {
return &NodeShutdownCommand{
Msg: msg,
}
}
Loading

0 comments on commit 44cbd2e

Please sign in to comment.