Skip to content

Commit

Permalink
Added IsFailurePermanent flag on DynamicTaskStatus (flyteorg#567)
Browse files Browse the repository at this point in the history
* added IsFailurePermanent flag on DynamicTaskStatus

Signed-off-by: Daniel Rammer <[email protected]>

* fixed linter

Signed-off-by: Daniel Rammer <[email protected]>

* cleaned up retryable vs permanent reporting

Signed-off-by: Daniel Rammer <[email protected]>

* make generate

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jul 6, 2023
1 parent eda29dd commit 7010392
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 13 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ type ExecutableDynamicNodeStatus interface {
GetDynamicNodePhase() DynamicNodePhase
GetDynamicNodeReason() string
GetExecutionError() *core.ExecutionError
GetIsFailurePermanent() bool
}

type MutableDynamicNodeStatus interface {
Expand All @@ -235,6 +236,7 @@ type MutableDynamicNodeStatus interface {
SetDynamicNodePhase(phase DynamicNodePhase)
SetDynamicNodeReason(reason string)
SetExecutionError(executionError *core.ExecutionError)
SetIsFailurePermanent(isFailurePermanent bool)
}

// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ const (

type DynamicNodeStatus struct {
MutableStruct
Phase DynamicNodePhase `json:"phase,omitempty"`
Reason string `json:"reason,omitempty"`
Error *ExecutionError `json:"error,omitempty"`
Phase DynamicNodePhase `json:"phase,omitempty"`
Reason string `json:"reason,omitempty"`
Error *ExecutionError `json:"error,omitempty"`
IsFailurePermanent bool `json:"permFailure,omitempty"`
}

func (in *DynamicNodeStatus) GetDynamicNodePhase() DynamicNodePhase {
Expand All @@ -116,6 +117,10 @@ func (in *DynamicNodeStatus) GetExecutionError() *core.ExecutionError {
return in.Error.ExecutionError
}

func (in *DynamicNodeStatus) GetIsFailurePermanent() bool {
return in.IsFailurePermanent
}

func (in *DynamicNodeStatus) SetDynamicNodeReason(reason string) {
if in.Reason != reason {
in.SetDirty()
Expand All @@ -138,6 +143,13 @@ func (in *DynamicNodeStatus) SetExecutionError(err *core.ExecutionError) {
}
}

func (in *DynamicNodeStatus) SetIsFailurePermanent(isFailurePermanent bool) {
if in.IsFailurePermanent != isFailurePermanent {
in.SetDirty()
in.IsFailurePermanent = isFailurePermanent
}
}

func (in *DynamicNodeStatus) Equals(o *DynamicNodeStatus) bool {
if in == nil && o == nil {
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,12 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context,

// As we do not support Failure Node, we can just return failure in this case
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(nil)),
handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Dynamic workflow failed", Error: state.Err},
nil
handler.DynamicNodeState{
Phase: v1alpha1.DynamicNodePhaseFailing,
Reason: "Dynamic workflow failed",
Error: state.Err,
IsFailurePermanent: state.HasFailed(),
}, nil
}

if state.IsComplete() {
Expand Down
13 changes: 10 additions & 3 deletions flytepropeller/pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,18 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod
return trns, err
}

// TODO: Use Execution Error for ds.Error type to propagate the recoverable flag and determine if the error is retryable.
// if DynamicNodeStatus is noted with permanent failures we report a non-recoverable failure
phaseInfoFailureFunc := handler.PhaseInfoRetryableFailure
phaseInfoFailureFuncErr := handler.PhaseInfoRetryableFailureErr
if ds.IsFailurePermanent {
phaseInfoFailureFunc = handler.PhaseInfoFailure
phaseInfoFailureFuncErr = handler.PhaseInfoFailureErr
}

if ds.Error != nil {
trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailureErr(ds.Error, nil))
trns = handler.DoTransition(handler.TransitionTypeEphemeral, phaseInfoFailureFuncErr(ds.Error, nil))
} else {
trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(core.ExecutionError_UNKNOWN, "DynamicNodeFailing", ds.Reason, nil))
trns = handler.DoTransition(handler.TransitionTypeEphemeral, phaseInfoFailureFunc(core.ExecutionError_UNKNOWN, "DynamicNodeFailing", ds.Reason, nil))
}
case v1alpha1.DynamicNodePhaseParentFinalizing:
if err := d.finalizeParentNode(ctx, nCtx); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions flytepropeller/pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flytestdlib/storage"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytestdlib/storage"
)

// This is the legacy state structure that gets translated to node status
Expand All @@ -31,9 +30,10 @@ type BranchNodeState struct {
type DynamicNodePhase uint8

type DynamicNodeState struct {
Phase v1alpha1.DynamicNodePhase
Reason string
Error *core.ExecutionError
Phase v1alpha1.DynamicNodePhase
Reason string
Error *core.ExecutionError
IsFailurePermanent bool
}

type WorkflowNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (n nodeStateManager) GetDynamicNodeState() handler.DynamicNodeState {
ds.Phase = dn.GetDynamicNodePhase()
ds.Reason = dn.GetDynamicNodeReason()
ds.Error = dn.GetExecutionError()
ds.IsFailurePermanent = dn.GetIsFailurePermanent()
}

return ds
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t.SetDynamicNodePhase(n.d.Phase)
t.SetDynamicNodeReason(n.d.Reason)
t.SetExecutionError(n.d.Error)
t.SetIsFailurePermanent(n.d.IsFailurePermanent)
}

// Update branch node status
Expand Down

0 comments on commit 7010392

Please sign in to comment.