diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index 8b6c02318f..33aa857d5b 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -226,6 +226,7 @@ type ExecutableDynamicNodeStatus interface { GetDynamicNodePhase() DynamicNodePhase GetDynamicNodeReason() string GetExecutionError() *core.ExecutionError + GetIsFailurePermanent() bool } type MutableDynamicNodeStatus interface { @@ -235,6 +236,7 @@ type MutableDynamicNodeStatus interface { SetDynamicNodePhase(phase DynamicNodePhase) SetDynamicNodeReason(reason string) SetExecutionError(executionError *core.ExecutionError) + SetIsFailurePermanent(isFailurePermanent bool) } // ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go index 849e4729c4..130e5d13f8 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go @@ -111,3 +111,35 @@ func (_m *ExecutableDynamicNodeStatus) GetExecutionError() *core.ExecutionError return r0 } + +type ExecutableDynamicNodeStatus_GetIsFailurePermanent struct { + *mock.Call +} + +func (_m ExecutableDynamicNodeStatus_GetIsFailurePermanent) Return(_a0 bool) *ExecutableDynamicNodeStatus_GetIsFailurePermanent { + return &ExecutableDynamicNodeStatus_GetIsFailurePermanent{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableDynamicNodeStatus) OnGetIsFailurePermanent() *ExecutableDynamicNodeStatus_GetIsFailurePermanent { + c_call := _m.On("GetIsFailurePermanent") + return &ExecutableDynamicNodeStatus_GetIsFailurePermanent{Call: c_call} +} + +func (_m *ExecutableDynamicNodeStatus) OnGetIsFailurePermanentMatch(matchers ...interface{}) *ExecutableDynamicNodeStatus_GetIsFailurePermanent { + c_call := _m.On("GetIsFailurePermanent", matchers...) + return &ExecutableDynamicNodeStatus_GetIsFailurePermanent{Call: c_call} +} + +// GetIsFailurePermanent provides a mock function with given fields: +func (_m *ExecutableDynamicNodeStatus) GetIsFailurePermanent() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go index 4299feb2db..bb102d58ba 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go @@ -112,6 +112,38 @@ func (_m *MutableDynamicNodeStatus) GetExecutionError() *core.ExecutionError { return r0 } +type MutableDynamicNodeStatus_GetIsFailurePermanent struct { + *mock.Call +} + +func (_m MutableDynamicNodeStatus_GetIsFailurePermanent) Return(_a0 bool) *MutableDynamicNodeStatus_GetIsFailurePermanent { + return &MutableDynamicNodeStatus_GetIsFailurePermanent{Call: _m.Call.Return(_a0)} +} + +func (_m *MutableDynamicNodeStatus) OnGetIsFailurePermanent() *MutableDynamicNodeStatus_GetIsFailurePermanent { + c_call := _m.On("GetIsFailurePermanent") + return &MutableDynamicNodeStatus_GetIsFailurePermanent{Call: c_call} +} + +func (_m *MutableDynamicNodeStatus) OnGetIsFailurePermanentMatch(matchers ...interface{}) *MutableDynamicNodeStatus_GetIsFailurePermanent { + c_call := _m.On("GetIsFailurePermanent", matchers...) + return &MutableDynamicNodeStatus_GetIsFailurePermanent{Call: c_call} +} + +// GetIsFailurePermanent provides a mock function with given fields: +func (_m *MutableDynamicNodeStatus) GetIsFailurePermanent() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + type MutableDynamicNodeStatus_IsDirty struct { *mock.Call } @@ -158,3 +190,8 @@ func (_m *MutableDynamicNodeStatus) SetDynamicNodeReason(reason string) { func (_m *MutableDynamicNodeStatus) SetExecutionError(executionError *core.ExecutionError) { _m.Called(executionError) } + +// SetIsFailurePermanent provides a mock function with given fields: isFailurePermanent +func (_m *MutableDynamicNodeStatus) SetIsFailurePermanent(isFailurePermanent bool) { + _m.Called(isFailurePermanent) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 282add2389..14efd4f401 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -96,9 +96,10 @@ const ( type DynamicNodeStatus struct { MutableStruct - Phase DynamicNodePhase `json:"phase,omitempty"` - Reason string `json:"reason,omitempty"` - Error *ExecutionError `json:"error,omitempty"` + Phase DynamicNodePhase `json:"phase,omitempty"` + Reason string `json:"reason,omitempty"` + Error *ExecutionError `json:"error,omitempty"` + IsFailurePermanent bool `json:"permFailure,omitempty"` } func (in *DynamicNodeStatus) GetDynamicNodePhase() DynamicNodePhase { @@ -116,6 +117,10 @@ func (in *DynamicNodeStatus) GetExecutionError() *core.ExecutionError { return in.Error.ExecutionError } +func (in *DynamicNodeStatus) GetIsFailurePermanent() bool { + return in.IsFailurePermanent +} + func (in *DynamicNodeStatus) SetDynamicNodeReason(reason string) { if in.Reason != reason { in.SetDirty() @@ -138,6 +143,13 @@ func (in *DynamicNodeStatus) SetExecutionError(err *core.ExecutionError) { } } +func (in *DynamicNodeStatus) SetIsFailurePermanent(isFailurePermanent bool) { + if in.IsFailurePermanent != isFailurePermanent { + in.SetDirty() + in.IsFailurePermanent = isFailurePermanent + } +} + func (in *DynamicNodeStatus) Equals(o *DynamicNodeStatus) bool { if in == nil && o == nil { return true diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go index 88b2ff66cc..6166d8722b 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -281,8 +281,12 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, // As we do not support Failure Node, we can just return failure in this case return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(nil)), - handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Dynamic workflow failed", Error: state.Err}, - nil + handler.DynamicNodeState{ + Phase: v1alpha1.DynamicNodePhaseFailing, + Reason: "Dynamic workflow failed", + Error: state.Err, + IsFailurePermanent: state.HasFailed(), + }, nil } if state.IsComplete() { diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler.go b/flytepropeller/pkg/controller/nodes/dynamic/handler.go index 1d35fb5023..f027cca0e4 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler.go @@ -202,11 +202,18 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod return trns, err } - // TODO: Use Execution Error for ds.Error type to propagate the recoverable flag and determine if the error is retryable. + // if DynamicNodeStatus is noted with permanent failures we report a non-recoverable failure + phaseInfoFailureFunc := handler.PhaseInfoRetryableFailure + phaseInfoFailureFuncErr := handler.PhaseInfoRetryableFailureErr + if ds.IsFailurePermanent { + phaseInfoFailureFunc = handler.PhaseInfoFailure + phaseInfoFailureFuncErr = handler.PhaseInfoFailureErr + } + if ds.Error != nil { - trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailureErr(ds.Error, nil)) + trns = handler.DoTransition(handler.TransitionTypeEphemeral, phaseInfoFailureFuncErr(ds.Error, nil)) } else { - trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(core.ExecutionError_UNKNOWN, "DynamicNodeFailing", ds.Reason, nil)) + trns = handler.DoTransition(handler.TransitionTypeEphemeral, phaseInfoFailureFunc(core.ExecutionError_UNKNOWN, "DynamicNodeFailing", ds.Reason, nil)) } case v1alpha1.DynamicNodePhaseParentFinalizing: if err := d.finalizeParentNode(ctx, nCtx); err != nil { diff --git a/flytepropeller/pkg/controller/nodes/handler/state.go b/flytepropeller/pkg/controller/nodes/handler/state.go index 2ca4fb015d..9688f4e33a 100644 --- a/flytepropeller/pkg/controller/nodes/handler/state.go +++ b/flytepropeller/pkg/controller/nodes/handler/state.go @@ -5,9 +5,8 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flytestdlib/storage" - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytestdlib/storage" ) // This is the legacy state structure that gets translated to node status @@ -31,9 +30,10 @@ type BranchNodeState struct { type DynamicNodePhase uint8 type DynamicNodeState struct { - Phase v1alpha1.DynamicNodePhase - Reason string - Error *core.ExecutionError + Phase v1alpha1.DynamicNodePhase + Reason string + Error *core.ExecutionError + IsFailurePermanent bool } type WorkflowNodeState struct { diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index 89347f79ba..843c3b1f9b 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -76,6 +76,7 @@ func (n nodeStateManager) GetDynamicNodeState() handler.DynamicNodeState { ds.Phase = dn.GetDynamicNodePhase() ds.Reason = dn.GetDynamicNodeReason() ds.Error = dn.GetExecutionError() + ds.IsFailurePermanent = dn.GetIsFailurePermanent() } return ds diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index ae615a9f30..306ab182a4 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -249,6 +249,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa t.SetDynamicNodePhase(n.d.Phase) t.SetDynamicNodeReason(n.d.Reason) t.SetExecutionError(n.d.Error) + t.SetIsFailurePermanent(n.d.IsFailurePermanent) } // Update branch node status