Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add guard against NodeStatus #22

Merged
merged 10 commits into from
Jun 1, 2023
68 changes: 62 additions & 6 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"

log "github.com/sirupsen/logrus"

argoerrs "github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/util/slice"
)
Expand Down Expand Up @@ -1704,6 +1706,64 @@ func (n Nodes) Find(f func(NodeStatus) bool) *NodeStatus {
return nil
}

// Get a NodeStatus from the hashmap of Nodes.
// Return a nil along with an error if non existent.
func (n Nodes) Get(key string) (*NodeStatus, error) {
isubasinghe marked this conversation as resolved.
Show resolved Hide resolved
val, ok := n[key]
if !ok {
return nil, fmt.Errorf("key was not found for %s", key)
}
return &val, nil
}

// Check if the Nodes map has a key entry
func (n Nodes) Has(key string) bool {
_, err := n.Get(key)
return err == nil
}

// Get the Phase of a Node
func (n Nodes) GetPhase(key string) (*NodePhase, error) {
val, err := n.Get(key)
if err != nil {
return nil, err
}
return &val.Phase, nil
}

// Set the status of a node by key
func (n Nodes) Set(key string, status NodeStatus) {
if status.Name == "" {
log.Warnf("Name was not set for key %s", key)
}
if status.ID == "" {
log.Warnf("ID was not set for key %s", key)
}
_, ok := n[key]
if ok {
log.Tracef("Changing NodeStatus for %s to %+v", key, status)
}
n[key] = status
}

// Delete a node from the Nodes by key
func (n Nodes) Delete(key string) {
has := n.Has(key)
if !has {
log.Warnf("Trying to delete non existent key %s", key)
return
}
delete(n, key)
}

// Get the name of a node by key
func (n Nodes) GetName(key string) (string, error) {
val, err := n.Get(key)
if err != nil {
return "", err
}
return val.Name, nil
}
func NodeWithName(name string) func(n NodeStatus) bool {
return func(n NodeStatus) bool { return n.Name == name }
}
Expand Down Expand Up @@ -3203,13 +3263,9 @@ func (wf *Workflow) GetTemplateByName(name string) *Template {
return nil
}

func (wf *Workflow) GetNodeByName(nodeName string) *NodeStatus {
func (wf *Workflow) GetNodeByName(nodeName string) (*NodeStatus, error) {
nodeID := wf.NodeID(nodeName)
node, ok := wf.Status.Nodes[nodeID]
if !ok {
return nil
}
return &node
return wf.Status.Nodes.Get(nodeID)
}

// GetResourceScope returns the template scope of workflow.
Expand Down
19 changes: 15 additions & 4 deletions server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,16 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif
kubeClient := auth.GetKubeClient(ctx)

var art *wfv1.Artifact

nodeStatus, err := wf.Status.Nodes.Get(nodeId)
if err != nil {
log.Errorf("Was unable to retrieve node for %s", nodeId)
return nil, nil, fmt.Errorf("was not able to retrieve node")
}
if isInput {
art = wf.Status.Nodes[nodeId].Inputs.GetArtifactByName(artifactName)
art = nodeStatus.Inputs.GetArtifactByName(artifactName)
} else {
art = wf.Status.Nodes[nodeId].Outputs.GetArtifactByName(artifactName)
art = nodeStatus.Outputs.GetArtifactByName(artifactName)
}
if art == nil {
return nil, nil, fmt.Errorf("artifact not found: %s, isInput=%t, Workflow Status=%+v", artifactName, isInput, wf.Status)
Expand All @@ -395,7 +401,12 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif
// 5. Inline Template

var archiveLocation *wfv1.ArtifactLocation
templateName := util.GetTemplateFromNode(wf.Status.Nodes[nodeId])
templateNode, err := wf.Status.Nodes.Get(nodeId)
if err != nil {
log.Errorf("was unable to retrieve node for %s", nodeId)
return nil, nil, fmt.Errorf("Unable to get artifact and driver due to inability to get node due for %s, err=%s", nodeId, err)
}
templateName := util.GetTemplateFromNode(*templateNode)
if templateName != "" {
template := wf.GetTemplateByName(templateName)
if template == nil {
Expand All @@ -412,7 +423,7 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif
archiveLocation = ar.ToArtifactLocation()
}

err := art.Relocate(archiveLocation) // if the Artifact defines the location (case 1), it will be used; otherwise whatever archiveLocation is set to
err = art.Relocate(archiveLocation) // if the Artifact defines the location (case 1), it will be used; otherwise whatever archiveLocation is set to
if err != nil {
return art, nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ func (t *Then) ExpectArtifact(nodeName string, artifactName string, bucketName s
nodeName = t.wf.Name
}

n := t.wf.GetNodeByName(nodeName)
n, err := t.wf.GetNodeByName(nodeName)
if err != nil {
t.t.Error("was unable to get node by name")
}
a := n.GetOutputs().GetArtifactByName(artifactName)
key, _ := a.GetKey()

Expand Down
12 changes: 9 additions & 3 deletions util/resource/updater.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package resource

import (
log "github.com/sirupsen/logrus"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

Expand All @@ -14,7 +16,7 @@ func UpdateResourceDurations(wf *wfv1.Workflow) {
} else if node.Fulfilled() {
// compute the sum of all children
node.ResourcesDuration = resourceDuration(wf, node, make(map[string]bool))
wf.Status.Nodes[nodeID] = node
wf.Status.Nodes.Set(nodeID, node)
}
}
}
Expand All @@ -27,11 +29,15 @@ func resourceDuration(wf *wfv1.Workflow, node wfv1.NodeStatus, visited map[strin
continue
}
visited[childID] = true
child := wf.Status.Nodes[childID]
child, err := wf.Status.Nodes.Get(childID)
if err != nil {
log.Warnf("was unable to obtain node for %s", childID)
continue
}
if child.Type == wfv1.NodeTypePod {
v = v.Add(child.ResourcesDuration)
}
v = v.Add(resourceDuration(wf, child, visited))
v = v.Add(resourceDuration(wf, *child, visited))
}
return v
}
14 changes: 9 additions & 5 deletions workflow/controller/artifact_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ func (woc *wfOperationCtx) processArtifactGCStrategy(ctx context.Context, strate
groupedByPod[podName] = make(templatesToArtifacts)
}
// get the Template for the Artifact
node, found := woc.wf.Status.Nodes[artifactSearchResult.NodeID]
if !found {
node, err := woc.wf.Status.Nodes.Get(artifactSearchResult.NodeID)
if err != nil {
woc.log.Errorf("Was unable to obtain node for %s", artifactSearchResult.NodeID)
return fmt.Errorf("can't process Artifact GC Strategy %s: node ID %q not found in Status??", strategy, artifactSearchResult.NodeID)
}
templateName := node.TemplateName
Expand Down Expand Up @@ -631,8 +632,9 @@ func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(artifactGCTask
foundGCFailure := false
for nodeName, nodeResult := range artifactGCTask.Status.ArtifactResultsByNode {
// find this node result in the Workflow Status
wfNode, found := woc.wf.Status.Nodes[nodeName]
if !found {
wfNode, err := woc.wf.Status.Nodes.Get(nodeName)
if err != nil {
woc.log.Errorf("Was unable to obtain node for %s", nodeName)
return false, fmt.Errorf("node named %q returned by WorkflowArtifactGCTask %q wasn't found in Workflow %q Status", nodeName, artifactGCTask.Name, woc.wf.Name)
}
if wfNode.Outputs == nil {
Expand All @@ -645,7 +647,9 @@ func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(artifactGCTask
// could be in a different WorkflowArtifactGCTask
continue
}
woc.wf.Status.Nodes[nodeName].Outputs.Artifacts[i].Deleted = artifactResult.Success

wfNode.Outputs.Artifacts[i].Deleted = artifactResult.Success
woc.wf.Status.Nodes.Set(nodeName, *wfNode)

if artifactResult.Error != nil {
woc.addArtGCCondition(fmt.Sprintf("%s (artifactGCTask: %s)", *artifactResult.Error, artifactGCTask.Name))
Expand Down
8 changes: 4 additions & 4 deletions workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
node := woc.wf.GetNodeByName(nodeName)
if node == nil {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending)
}
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
Expand All @@ -30,8 +30,8 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
// which prevents creating many pending nodes that could never be scheduled
for _, c := range tmpl.ContainerSet.GetContainers() {
ctxNodeName := fmt.Sprintf("%s.%s", nodeName, c.Name)
ctrNode := woc.wf.GetNodeByName(ctxNodeName)
if ctrNode == nil {
_, err := woc.wf.GetNodeByName(ctxNodeName)
if err != nil {
_ = woc.initializeNode(ctxNodeName, wfv1.NodeTypeContainer, templateScope, orgTmpl, node.ID, wfv1.NodePending)
}
}
Expand Down
Loading