Skip to content

Commit

Permalink
Simplify GetNodeExecutionStatus (flyteorg#294)
Browse files Browse the repository at this point in the history
* Revert "Construct subnode DataDir to be under parent's node OutputDir to keep behavior consistent across (flyteorg#292)" (flyteorg#293)

This reverts commit 051eba7.

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Simplify GetNodeExecutionStatus

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* SetDataDir in cached DynamicWorkflow case

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Fix case in which downstream nodes may incorrectly fail because of parentNodeID #minor (flyteorg#288)

* Branch canexecute fix

Signed-off-by: Ketan Umare <[email protected]>

* tests

Signed-off-by: Ketan Umare <[email protected]>

* more unit test fixes

Signed-off-by: Ketan Umare <[email protected]>

* fixed tests

Signed-off-by: Ketan Umare <[email protected]>

* more documentation

Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Haytham Abuelfutuh <[email protected]>

* [wip] Working for ev1 to be merged into flyteorg#294 (flyteorg#295)

Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Haytham Abuelfutuh <[email protected]>

* bump for DCO

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Fix DataDir of child nodes

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Fix mocks for tests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* fixed unit test

Signed-off-by: Ketan Umare <[email protected]>

Co-authored-by: Ketan Umare <[email protected]>
Co-authored-by: Ketan Umare <[email protected]>
  • Loading branch information
3 people authored Jul 21, 2021
1 parent 59069d2 commit fc9e555
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 89 deletions.
6 changes: 3 additions & 3 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
propeller:
rawoutput-prefix: s3://my-container/test/
metadata-prefix: metadata/propeller/sandbox
workers: 4
workers: 1
workflow-reeval-duration: 10s
downstream-eval-duration: 5s
limit-namespace: "all"
Expand All @@ -23,7 +23,7 @@ propeller:
rate: 100
capacity: 1000
# This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container
kube-config: "$HOME/kubeconfig/k3s/k3s.yaml"
kube-config: "$HOME/.flyte/k3s/k3s.yaml"
publish-k8s-events: true
workflowStore:
policy: "ResourceVersionCache"
Expand Down Expand Up @@ -102,5 +102,5 @@ catalog-cache:
endpoint: datacatalog:8089
insecure: true
logger:
level: 4
level: 5
show-source: true
76 changes: 34 additions & 42 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"strconv"
"time"
Expand Down Expand Up @@ -517,62 +518,53 @@ func (in NodeStatus) GetTaskNodeStatus() ExecutableTaskNodeStatus {
return in.TaskNodeStatus
}

func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) ExecutableNodeStatus {
n, ok := in.SubNodeStatus[id]
if ok {
n.SetParentTaskID(in.GetParentTaskID())
n.DataReferenceConstructor = in.DataReferenceConstructor
if len(n.GetDataDir()) == 0 {
dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetDataDir(), id)
if err != nil {
logger.Errorf(ctx, "Failed to construct data dir for node [%v]", id)
return n
}

n.SetDataDir(dataDir)
func (in *NodeStatus) setEphemeralNodeExecutionStatusAttributes(ctx context.Context, id NodeID, n *NodeStatus) error {
n.SetParentTaskID(in.GetParentTaskID())
if len(n.GetDataDir()) == 0 {
dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetOutputDir(), id)
if err != nil {
return fmt.Errorf("failed to construct data dir for node [%v]. Error: %w", id, err)
}

if len(n.GetOutputDir()) == 0 {
outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, n.GetDataDir(), strconv.FormatUint(uint64(in.Attempts), 10))
if err != nil {
logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id)
return n
}
n.SetDataDir(dataDir)
}

n.SetOutputDir(outputDir)
if len(n.GetOutputDir()) == 0 {
outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, n.GetDataDir(), strconv.FormatUint(uint64(n.Attempts), 10))
if err != nil {
return fmt.Errorf("failed to construct output dir for node [%v]. Error: %w", id, err)
}

return n
n.SetOutputDir(outputDir)
}

if in.SubNodeStatus == nil {
in.SubNodeStatus = make(map[NodeID]*NodeStatus)
}
n.DataReferenceConstructor = in.DataReferenceConstructor

newNodeStatus := &NodeStatus{
MutableStruct: MutableStruct{},
}
newNodeStatus.SetParentTaskID(in.GetParentTaskID())
newNodeStatus.SetParentNodeID(in.GetParentNodeID())
dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetDataDir(), id)
if err != nil {
logger.Errorf(ctx, "Failed to construct data dir for node [%v]", id)
return n
return nil
}

func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) ExecutableNodeStatus {
n, ok := in.SubNodeStatus[id]
if !ok {
if in.SubNodeStatus == nil {
in.SubNodeStatus = make(map[NodeID]*NodeStatus)
}

n = &NodeStatus{
MutableStruct: MutableStruct{},
}

in.SubNodeStatus[id] = n
in.SetDirty()
}

outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, dataDir, "0")
err := in.setEphemeralNodeExecutionStatusAttributes(ctx, id, n)
if err != nil {
logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id)
logger.Errorf(ctx, "Failed to set node attributes for node [%v]. Error: %v", id, err)
return n
}

newNodeStatus.SetDataDir(dataDir)
newNodeStatus.SetOutputDir(outputDir)
newNodeStatus.DataReferenceConstructor = in.DataReferenceConstructor

in.SubNodeStatus[id] = newNodeStatus
in.SetDirty()
return newNodeStatus
return n
}

func (in *NodeStatus) IsTerminated() bool {
Expand Down
62 changes: 62 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package v1alpha1

import (
"context"
"encoding/json"
"testing"

"github.com/flyteorg/flytestdlib/storage"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -190,3 +193,62 @@ func TestDynamicNodeStatus_SetExecutionError(t *testing.T) {
})
}
}

func TestNodeStatus_GetNodeExecutionStatus(t *testing.T) {
ctx := context.Background()
t.Run("First Level", func(t *testing.T) {
t.Run("Not cached", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())
})

t.Run("cached", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())

newNode = n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())
})

t.Run("cached but datadir not populated", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{
"abc": {},
},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())
})
})

t.Run("Nested", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())

subsubNode := newNode.GetNodeExecutionStatus(ctx, "xyz")
assert.Equal(t, storage.DataReference("/abc/0/xyz/0"), subsubNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc/0/xyz"), subsubNode.GetDataDir())
})
}
Loading

0 comments on commit fc9e555

Please sign in to comment.