diff --git a/cmd/controller/cmd/root.go b/cmd/controller/cmd/root.go index 315df65335..47368a4e47 100644 --- a/cmd/controller/cmd/root.go +++ b/cmd/controller/cmd/root.go @@ -5,8 +5,11 @@ import ( "flag" "fmt" "os" + "runtime/pprof" "strings" + "github.com/lyft/flytestdlib/contextutils" + "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -215,10 +218,24 @@ func executeRootCmd(cfg *config2.Config) { logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err) } - c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope) + // Start controller runtime manager to start listening to resource changes. + // K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer + // EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect + // workflow changes faster than the default sync interval for workflow CRDs. + go func(ctx context.Context) { + ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager") + pprof.SetGoroutineLabels(ctx) + logger.Infof(ctx, "Starting controller-runtime manager") + err := mgr.Start(ctx.Done()) + if err != nil { + logger.Fatalf(ctx, "Failed to start manager. Error: %v", err) + } + }(ctx) + c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope) if err != nil { logger.Fatalf(ctx, "Failed to start Controller - [%v]", err.Error()) + return } else if c == nil { logger.Fatalf(ctx, "Failed to start Controller, nil controller received.") } diff --git a/go.mod b/go.mod index fd9f7bb6de..9ab3eab8e5 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/jmespath/go-jmespath v0.3.0 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.17.24 - github.com/lyft/flyteplugins v0.3.23 + github.com/lyft/flyteplugins v0.3.28 github.com/lyft/flytestdlib v0.3.3 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect diff --git a/go.sum b/go.sum index 4cccd6b784..874dce4345 100644 --- a/go.sum +++ b/go.sum @@ -393,6 +393,8 @@ github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0 github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.23 h1:cN6d6f1ZkoHw+HD4wFCSVFVv+sCSeyx13E+hXIYEDzo= github.com/lyft/flyteplugins v0.3.23/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= +github.com/lyft/flyteplugins v0.3.28 h1:4YSjJyQUHFtVoQio4X3wYtS7WRIGdJQf9Wtcl75e+1w= +github.com/lyft/flyteplugins v0.3.28/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE= github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= @@ -629,6 +631,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= +golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -802,6 +805,7 @@ gomodules.xyz/jsonpatch/v2 v2.1.0/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3m gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485 h1:OB/uP/Puiu5vS5QMRPrXCDWUPb+kt8f1KW8oQzFejQw= gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= +gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e h1:jRyg0XfpwWlhEV8mDfdNGBeSJM2fuyh9Yjrnd8kF2Ts= gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e/go.mod h1:kS+toOQn6AQKjmKJ7gzohV1XkqsFehRA2FbsbkopSuQ= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= diff --git a/hack/tools.go b/hack/tools.go index 88ecb2605c..5b96173b64 100644 --- a/hack/tools.go +++ b/hack/tools.go @@ -2,4 +2,4 @@ package tools // Uncomment this to make code-generator work -//import _ "k8s.io/code-generator" +// import _ "k8s.io/code-generator" diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 4d490ef2a2..de213a7762 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -99,6 +99,15 @@ const ( WorkflowPhaseFailing WorkflowPhaseFailed WorkflowPhaseAborted + // WorkflowPhaseHandlingFailureNode is the phase the workflow will enter when a failure is detected in the workflow, + // the workflow has finished cleaning up (aborted running nodes... etc.) and a failure node is declared in the + // workflow spec. We enter this explicit phase so as to ensure we do not attempt to repeatedly clean up old nodes + // when handling a workflow event which might yield to seemingly random failures. This phase ensure we are handling, + // and only so, the failure node until it's done executing or it fails itself. + // If a failure node fails to execute (a real possibility), the final failure output of the workflow will only include + // its failure reason. In other words, its failure will mask the original failure for the workflow. It's imperative + // failure nodes should be very simple, very resilient and very well tested. + WorkflowPhaseHandlingFailureNode ) func (p WorkflowPhase) String() string { @@ -117,6 +126,8 @@ func (p WorkflowPhase) String() string { return "Succeeding" case WorkflowPhaseAborted: return "Aborted" + case WorkflowPhaseHandlingFailureNode: + return "HandlingFailureNode" } return "Unknown" } @@ -198,12 +209,14 @@ type ExecutableBranchNode interface { type ExecutableWorkflowNodeStatus interface { GetWorkflowNodePhase() WorkflowNodePhase + GetExecutionError() *core.ExecutionError } type MutableWorkflowNodeStatus interface { Mutable ExecutableWorkflowNodeStatus SetWorkflowNodePhase(phase WorkflowNodePhase) + SetExecutionError(executionError *core.ExecutionError) } type Mutable interface { diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go index 96f3bca5b9..949aa59a38 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableDynamicNodeStatus.go @@ -3,8 +3,10 @@ package mocks import ( - v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" mock "github.com/stretchr/testify/mock" + + v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) // ExecutableDynamicNodeStatus is an autogenerated mock type for the ExecutableDynamicNodeStatus type @@ -75,3 +77,37 @@ func (_m *ExecutableDynamicNodeStatus) GetDynamicNodeReason() string { return r0 } + +type ExecutableDynamicNodeStatus_GetExecutionError struct { + *mock.Call +} + +func (_m ExecutableDynamicNodeStatus_GetExecutionError) Return(_a0 *core.ExecutionError) *ExecutableDynamicNodeStatus_GetExecutionError { + return &ExecutableDynamicNodeStatus_GetExecutionError{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableDynamicNodeStatus) OnGetExecutionError() *ExecutableDynamicNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError") + return &ExecutableDynamicNodeStatus_GetExecutionError{Call: c} +} + +func (_m *ExecutableDynamicNodeStatus) OnGetExecutionErrorMatch(matchers ...interface{}) *ExecutableDynamicNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError", matchers...) + return &ExecutableDynamicNodeStatus_GetExecutionError{Call: c} +} + +// GetExecutionError provides a mock function with given fields: +func (_m *ExecutableDynamicNodeStatus) GetExecutionError() *core.ExecutionError { + ret := _m.Called() + + var r0 *core.ExecutionError + if rf, ok := ret.Get(0).(func() *core.ExecutionError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.ExecutionError) + } + } + + return r0 +} diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflowNodeStatus.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflowNodeStatus.go index 6863baca51..586f043309 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflowNodeStatus.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflowNodeStatus.go @@ -3,8 +3,10 @@ package mocks import ( - v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" mock "github.com/stretchr/testify/mock" + + v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) // ExecutableWorkflowNodeStatus is an autogenerated mock type for the ExecutableWorkflowNodeStatus type @@ -12,6 +14,40 @@ type ExecutableWorkflowNodeStatus struct { mock.Mock } +type ExecutableWorkflowNodeStatus_GetExecutionError struct { + *mock.Call +} + +func (_m ExecutableWorkflowNodeStatus_GetExecutionError) Return(_a0 *core.ExecutionError) *ExecutableWorkflowNodeStatus_GetExecutionError { + return &ExecutableWorkflowNodeStatus_GetExecutionError{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableWorkflowNodeStatus) OnGetExecutionError() *ExecutableWorkflowNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError") + return &ExecutableWorkflowNodeStatus_GetExecutionError{Call: c} +} + +func (_m *ExecutableWorkflowNodeStatus) OnGetExecutionErrorMatch(matchers ...interface{}) *ExecutableWorkflowNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError", matchers...) + return &ExecutableWorkflowNodeStatus_GetExecutionError{Call: c} +} + +// GetExecutionError provides a mock function with given fields: +func (_m *ExecutableWorkflowNodeStatus) GetExecutionError() *core.ExecutionError { + ret := _m.Called() + + var r0 *core.ExecutionError + if rf, ok := ret.Get(0).(func() *core.ExecutionError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.ExecutionError) + } + } + + return r0 +} + type ExecutableWorkflowNodeStatus_GetWorkflowNodePhase struct { *mock.Call } diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go index 70ccd23a1f..4cabde12b3 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go @@ -3,8 +3,10 @@ package mocks import ( - v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" mock "github.com/stretchr/testify/mock" + + v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) // MutableDynamicNodeStatus is an autogenerated mock type for the MutableDynamicNodeStatus type @@ -76,6 +78,40 @@ func (_m *MutableDynamicNodeStatus) GetDynamicNodeReason() string { return r0 } +type MutableDynamicNodeStatus_GetExecutionError struct { + *mock.Call +} + +func (_m MutableDynamicNodeStatus_GetExecutionError) Return(_a0 *core.ExecutionError) *MutableDynamicNodeStatus_GetExecutionError { + return &MutableDynamicNodeStatus_GetExecutionError{Call: _m.Call.Return(_a0)} +} + +func (_m *MutableDynamicNodeStatus) OnGetExecutionError() *MutableDynamicNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError") + return &MutableDynamicNodeStatus_GetExecutionError{Call: c} +} + +func (_m *MutableDynamicNodeStatus) OnGetExecutionErrorMatch(matchers ...interface{}) *MutableDynamicNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError", matchers...) + return &MutableDynamicNodeStatus_GetExecutionError{Call: c} +} + +// GetExecutionError provides a mock function with given fields: +func (_m *MutableDynamicNodeStatus) GetExecutionError() *core.ExecutionError { + ret := _m.Called() + + var r0 *core.ExecutionError + if rf, ok := ret.Get(0).(func() *core.ExecutionError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.ExecutionError) + } + } + + return r0 +} + type MutableDynamicNodeStatus_IsDirty struct { *mock.Call } @@ -117,3 +153,8 @@ func (_m *MutableDynamicNodeStatus) SetDynamicNodePhase(phase v1alpha1.DynamicNo func (_m *MutableDynamicNodeStatus) SetDynamicNodeReason(reason string) { _m.Called(reason) } + +// SetExecutionError provides a mock function with given fields: executionError +func (_m *MutableDynamicNodeStatus) SetExecutionError(executionError *core.ExecutionError) { + _m.Called(executionError) +} diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableWorkflowNodeStatus.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableWorkflowNodeStatus.go index 33b19a0ccd..6e85e603ff 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableWorkflowNodeStatus.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableWorkflowNodeStatus.go @@ -3,8 +3,10 @@ package mocks import ( - v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" mock "github.com/stretchr/testify/mock" + + v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) // MutableWorkflowNodeStatus is an autogenerated mock type for the MutableWorkflowNodeStatus type @@ -12,6 +14,40 @@ type MutableWorkflowNodeStatus struct { mock.Mock } +type MutableWorkflowNodeStatus_GetExecutionError struct { + *mock.Call +} + +func (_m MutableWorkflowNodeStatus_GetExecutionError) Return(_a0 *core.ExecutionError) *MutableWorkflowNodeStatus_GetExecutionError { + return &MutableWorkflowNodeStatus_GetExecutionError{Call: _m.Call.Return(_a0)} +} + +func (_m *MutableWorkflowNodeStatus) OnGetExecutionError() *MutableWorkflowNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError") + return &MutableWorkflowNodeStatus_GetExecutionError{Call: c} +} + +func (_m *MutableWorkflowNodeStatus) OnGetExecutionErrorMatch(matchers ...interface{}) *MutableWorkflowNodeStatus_GetExecutionError { + c := _m.On("GetExecutionError", matchers...) + return &MutableWorkflowNodeStatus_GetExecutionError{Call: c} +} + +// GetExecutionError provides a mock function with given fields: +func (_m *MutableWorkflowNodeStatus) GetExecutionError() *core.ExecutionError { + ret := _m.Called() + + var r0 *core.ExecutionError + if rf, ok := ret.Get(0).(func() *core.ExecutionError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.ExecutionError) + } + } + + return r0 +} + type MutableWorkflowNodeStatus_GetWorkflowNodePhase struct { *mock.Call } @@ -76,6 +112,11 @@ func (_m *MutableWorkflowNodeStatus) IsDirty() bool { return r0 } +// SetExecutionError provides a mock function with given fields: executionError +func (_m *MutableWorkflowNodeStatus) SetExecutionError(executionError *core.ExecutionError) { + _m.Called(executionError) +} + // SetWorkflowNodePhase provides a mock function with given fields: phase func (_m *MutableWorkflowNodeStatus) SetWorkflowNodePhase(phase v1alpha1.WorkflowNodePhase) { _m.Called(phase) diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 4a34d38adf..f74f619209 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -155,7 +155,19 @@ const ( type WorkflowNodeStatus struct { MutableStruct - Phase WorkflowNodePhase `json:"phase"` + Phase WorkflowNodePhase `json:"phase"` + ExecutionError *core.ExecutionError `json:"executionError"` +} + +func (in *WorkflowNodeStatus) SetExecutionError(executionError *core.ExecutionError) { + if in.ExecutionError != executionError { + in.SetDirty() + in.ExecutionError = executionError + } +} + +func (in *WorkflowNodeStatus) GetExecutionError() *core.ExecutionError { + return in.ExecutionError } func (in *WorkflowNodeStatus) GetWorkflowNodePhase() WorkflowNodePhase { diff --git a/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go index f15bf04bf4..c0e6eb32c2 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go @@ -5,6 +5,7 @@ package v1alpha1 import ( + core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" @@ -206,7 +207,8 @@ func (in *FlyteWorkflow) DeepCopyInto(out *FlyteWorkflow) { } in.Status.DeepCopyInto(&out.Status) if in.DataReferenceConstructor != nil { - // This was manually modified to not generated a deep copy constructor for this. There is no way to skip generation of fields + // This was manually modified to not generated a deep copy constructor for this. There is no way to skip + // generation of fields out.DataReferenceConstructor = in.DataReferenceConstructor } return @@ -503,7 +505,7 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { if in.WorkflowNodeStatus != nil { in, out := &in.WorkflowNodeStatus, &out.WorkflowNodeStatus *out = new(WorkflowNodeStatus) - **out = **in + (*in).DeepCopyInto(*out) } if in.TaskNodeStatus != nil { in, out := &in.TaskNodeStatus, &out.TaskNodeStatus @@ -519,7 +521,8 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { *out = (*in).DeepCopy() } if in.DataReferenceConstructor != nil { - // This was manually modified to not generated a deep copy constructor for this. There is no way to skip generation of fields + // This was manually modified to not generated a deep copy constructor for this. There is no way to skip + // generation of fields out.DataReferenceConstructor = in.DataReferenceConstructor } return @@ -625,6 +628,11 @@ func (in *WorkflowNodeSpec) DeepCopy() *WorkflowNodeSpec { func (in *WorkflowNodeStatus) DeepCopyInto(out *WorkflowNodeStatus) { *out = *in out.MutableStruct = in.MutableStruct + if in.ExecutionError != nil { + in, out := &in.ExecutionError, &out.ExecutionError + *out = new(core.ExecutionError) + **out = **in + } return } @@ -724,7 +732,8 @@ func (in *WorkflowStatus) DeepCopyInto(out *WorkflowStatus) { *out = (*in).DeepCopy() } if in.DataReferenceConstructor != nil { - // This was manually modified to not generated a deep copy constructor for this. There is no way to skip generation of fields + // This was manually modified to not generated a deep copy constructor for this. There is no way to skip + // generation of fields out.DataReferenceConstructor = in.DataReferenceConstructor } return diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 08a0a7f907..1ca1728906 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -67,18 +67,18 @@ func (p *Propeller) Initialize(ctx context.Context) error { // The return value should be an error, in the case, we wish to retry this workflow //
 //
-//     +--------+        +--------+        +--------+     +--------+
-//     |        |        |        |        |        |     |        |
-//     | Ready  +--------> Running+--------> Succeeding---> Success|
-//     |        |        |        |        |        |     |        |
-//     +--------+        +--------+        +---------     +--------+
+//     +--------+        +---------+        +------------+     +---------+
+//     |        |        |         |        |            |     |         |
+//     | Ready  +--------> Running +--------> Succeeding +-----> Success |
+//     |        |        |         |        |            |     |         |
+//     +--------+        +---------+        +------------+     +---------+
 //         |                  |
 //         |                  |
-//         |             +----v---+        +--------+
-//         |             |        |        |        |
-//         +-------------> Failing+--------> Failed |
-//                       |        |        |        |
-//                       +--------+        +--------+
+//         |             +----v----+        +---------------------+        +--------+
+//         |             |         |        |     (optional)      |        |        |
+//         +-------------> Failing +--------> HandlingFailureNode +--------> Failed |
+//                       |         |        |                     |        |        |
+//                       +---------+        +---------------------+        +--------+
 // 
func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { logger.Infof(ctx, "Processing Workflow.") diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index fe80509ff0..75d56749d3 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -592,12 +592,22 @@ func (c *nodeExecutor) SetInputsForStartNode(ctx context.Context, execContext ex return executors.NodeStatusComplete, nil } +func canHandleNode(phase v1alpha1.NodePhase) bool { + return phase == v1alpha1.NodePhaseNotYetStarted || + phase == v1alpha1.NodePhaseQueued || + phase == v1alpha1.NodePhaseRunning || + phase == v1alpha1.NodePhaseFailing || + phase == v1alpha1.NodePhaseTimingOut || + phase == v1alpha1.NodePhaseRetryableFailure || + phase == v1alpha1.NodePhaseSucceeding +} + func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID()) nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) + nodePhase := nodeStatus.GetPhase() - switch nodeStatus.GetPhase() { - case v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, v1alpha1.NodePhaseTimingOut, v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseSucceeding: + if canHandleNode(nodePhase) { // TODO Follow up Pull Request, // 1. Rename this method to DAGTraversalHandleNode (accepts a DAGStructure along-with) the remaining arguments // 2. Create a new method called HandleNode (part of the interface) (remaining all args as the previous method, but no DAGStructure @@ -638,9 +648,9 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe // TODO we can optimize skip state handling by iterating down the graph and marking all as skipped // Currently we treat either Skip or Success the same way. In this approach only one node will be skipped // at a time. As we iterate down, further nodes will be skipped - case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseSkipped: + } else if nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseSkipped { return c.handleDownstream(ctx, execContext, dag, nl, currentNode) - case v1alpha1.NodePhaseFailed: + } else if nodePhase == v1alpha1.NodePhaseFailed { // This should not happen logger.Debugf(currentNodeCtx, "Node Failed") return executors.NodeStatusFailed(&core.ExecutionError{ @@ -648,7 +658,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe Message: "Node failed", Kind: core.ExecutionError_SYSTEM, }), nil - case v1alpha1.NodePhaseTimedOut: + } else if nodePhase == v1alpha1.NodePhaseTimedOut { logger.Debugf(currentNodeCtx, "Node Timed Out") return executors.NodeStatusTimedOut, nil } @@ -657,9 +667,9 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe func (c *nodeExecutor) FinalizeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) error { nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) + nodePhase := nodeStatus.GetPhase() - switch nodeStatus.GetPhase() { - case v1alpha1.NodePhaseFailing, v1alpha1.NodePhaseSucceeding, v1alpha1.NodePhaseRetryableFailure: + if canHandleNode(nodePhase) { ctx = contextutils.WithNodeID(ctx, currentNode.GetID()) // Now depending on the node type decide @@ -677,7 +687,7 @@ func (c *nodeExecutor) FinalizeHandler(ctx context.Context, execContext executor if err != nil { return err } - default: + } else { // Abort downstream nodes downstreamNodes, err := dag.FromNode(currentNode.GetID()) if err != nil { @@ -710,9 +720,8 @@ func (c *nodeExecutor) FinalizeHandler(ctx context.Context, execContext executor func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error { nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) - - switch nodeStatus.GetPhase() { - case v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, v1alpha1.NodePhaseSucceeding, v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseQueued: + nodePhase := nodeStatus.GetPhase() + if canHandleNode(nodePhase) { ctx = contextutils.WithNodeID(ctx, currentNode.GetID()) // Now depending on the node type decide @@ -749,7 +758,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E return errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") } } - case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseSkipped: + } else if nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseSkipped { // Abort downstream nodes downstreamNodes, err := dag.FromNode(currentNode.GetID()) if err != nil { @@ -775,10 +784,11 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E } return nil - default: + } else { ctx = contextutils.WithNodeID(ctx, currentNode.GetID()) logger.Warnf(ctx, "Trying to abort a node in state [%s]", nodeStatus.GetPhase().String()) } + return nil } diff --git a/pkg/controller/nodes/handler/ephase_enumer.go b/pkg/controller/nodes/handler/ephase_enumer.go new file mode 100644 index 0000000000..ed2318c031 --- /dev/null +++ b/pkg/controller/nodes/handler/ephase_enumer.go @@ -0,0 +1,58 @@ +// Code generated by "enumer --type=EPhase --trimprefix=EPhase"; DO NOT EDIT. + +// +package handler + +import ( + "fmt" +) + +const _EPhaseName = "UndefinedNotReadyQueuedRunningSkipFailedRetryableFailureSuccessTimedoutFailing" + +var _EPhaseIndex = [...]uint8{0, 9, 17, 23, 30, 34, 40, 56, 63, 71, 78} + +func (i EPhase) String() string { + if i >= EPhase(len(_EPhaseIndex)-1) { + return fmt.Sprintf("EPhase(%d)", i) + } + return _EPhaseName[_EPhaseIndex[i]:_EPhaseIndex[i+1]] +} + +var _EPhaseValues = []EPhase{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + +var _EPhaseNameToValueMap = map[string]EPhase{ + _EPhaseName[0:9]: 0, + _EPhaseName[9:17]: 1, + _EPhaseName[17:23]: 2, + _EPhaseName[23:30]: 3, + _EPhaseName[30:34]: 4, + _EPhaseName[34:40]: 5, + _EPhaseName[40:56]: 6, + _EPhaseName[56:63]: 7, + _EPhaseName[63:71]: 8, + _EPhaseName[71:78]: 9, +} + +// EPhaseString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func EPhaseString(s string) (EPhase, error) { + if val, ok := _EPhaseNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to EPhase values", s) +} + +// EPhaseValues returns all values of the enum +func EPhaseValues() []EPhase { + return _EPhaseValues +} + +// IsAEPhase returns "true" if the value is listed in the enum definition. "false" otherwise +func (i EPhase) IsAEPhase() bool { + for _, v := range _EPhaseValues { + if i == v { + return true + } + } + return false +} diff --git a/pkg/controller/nodes/handler/state.go b/pkg/controller/nodes/handler/state.go index 19164155f0..071139bc61 100644 --- a/pkg/controller/nodes/handler/state.go +++ b/pkg/controller/nodes/handler/state.go @@ -36,6 +36,7 @@ type DynamicNodeState struct { type WorkflowNodeState struct { Phase v1alpha1.WorkflowNodePhase + Error *core.ExecutionError } type NodeStateWriter interface { diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index efe94efa95..bc7da55969 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -7,6 +7,8 @@ import ( "github.com/lyft/flytestdlib/storage" ) +//go:generate enumer --type=EPhase --trimprefix=EPhase + type EPhase uint8 const ( @@ -19,30 +21,9 @@ const ( EPhaseRetryableFailure EPhaseSuccess EPhaseTimedout + EPhaseFailing ) -func (p EPhase) String() string { - switch p { - case EPhaseNotReady: - return "not-ready" - case EPhaseQueued: - return "queued" - case EPhaseRunning: - return "running" - case EPhaseSkip: - return "skip" - case EPhaseFailed: - return "failed" - case EPhaseRetryableFailure: - return "retryable-fail" - case EPhaseSuccess: - return "success" - case EPhaseTimedout: - return "timedout" - } - return "undefined" -} - func (p EPhase) IsTerminal() bool { if p == EPhaseFailed || p == EPhaseSuccess || p == EPhaseSkip || p == EPhaseTimedout { return true @@ -148,6 +129,7 @@ func phaseInfoFailed(p EPhase, err *core.ExecutionError, info *ExecutionInfo) Ph Message: "Unknown error message", } } + return phaseInfo(p, err, info, err.Message) } @@ -159,6 +141,10 @@ func PhaseInfoFailureErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInf return phaseInfoFailed(EPhaseFailed, err, info) } +func PhaseInfoFailingErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInfo { + return phaseInfoFailed(EPhaseFailing, err, info) +} + func PhaseInfoRetryableFailure(kind core.ExecutionError_ErrorKind, code, reason string, info *ExecutionInfo) PhaseInfo { return phaseInfoFailed(EPhaseRetryableFailure, &core.ExecutionError{Kind: kind, Code: code, Message: reason}, info) } diff --git a/pkg/controller/nodes/handler/transition_info_test.go b/pkg/controller/nodes/handler/transition_info_test.go index d123cd07b1..ffcdf27976 100644 --- a/pkg/controller/nodes/handler/transition_info_test.go +++ b/pkg/controller/nodes/handler/transition_info_test.go @@ -8,34 +8,10 @@ import ( ) func TestPhaseInfoQueued(t *testing.T) { - p := PhaseInfoQueued("queued") + p := PhaseInfoQueued("Queued") assert.Equal(t, EPhaseQueued, p.p) } -func TestEPhase_String(t *testing.T) { - tests := []struct { - name string - p EPhase - }{ - {"queued", EPhaseQueued}, - {"not-ready", EPhaseNotReady}, - {"timedout", EPhaseTimedout}, - {"undefined", EPhaseUndefined}, - {"success", EPhaseSuccess}, - {"skip", EPhaseSkip}, - {"failed", EPhaseFailed}, - {"running", EPhaseRunning}, - {"retryable-fail", EPhaseRetryableFailure}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.p.String(); got != tt.name { - t.Errorf("String() = %v, want %v", got, tt.name) - } - }) - } -} - func TestEPhase_IsTerminal(t *testing.T) { tests := []struct { name string diff --git a/pkg/controller/nodes/subworkflow/handler.go b/pkg/controller/nodes/subworkflow/handler.go index 644991dbd8..4a4ee65772 100644 --- a/pkg/controller/nodes/subworkflow/handler.go +++ b/pkg/controller/nodes/subworkflow/handler.go @@ -49,11 +49,12 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu errors.BadSpecificationError, errMsg, nil)), nil } - updateNodeStateFn := func(transition handler.Transition, err error) (handler.Transition, error) { + updateNodeStateFn := func(transition handler.Transition, newPhase v1alpha1.WorkflowNodePhase, err error) (handler.Transition, error) { if err != nil { return transition, err } - workflowNodeState := handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting} + + workflowNodeState := handler.WorkflowNodeState{Phase: newPhase} err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if err != nil { logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error()) @@ -64,7 +65,8 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu } wfNode := nCtx.Node().GetWorkflowNode() - workflowPhase := nCtx.NodeStateReader().GetWorkflowNodeState().Phase + wfNodeState := nCtx.NodeStateReader().GetWorkflowNodeState() + workflowPhase := wfNodeState.Phase if workflowPhase == v1alpha1.WorkflowNodePhaseUndefined { if wfNode == nil { errMsg := "Invoked workflow handler, for a non workflow Node." @@ -72,18 +74,35 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu } if wfNode.GetSubWorkflowRef() != nil { - return updateNodeStateFn(w.subWfHandler.StartSubWorkflow(ctx, nCtx)) + trns, err := w.subWfHandler.StartSubWorkflow(ctx, nCtx) + return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err) } else if wfNode.GetLaunchPlanRefID() != nil { - return updateNodeStateFn(w.lpHandler.StartLaunchPlan(ctx, nCtx)) + trns, err := w.lpHandler.StartLaunchPlan(ctx, nCtx) + return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err) } return invalidWFNodeError() - } + } else if workflowPhase == v1alpha1.WorkflowNodePhaseExecuting { + if wfNode.GetSubWorkflowRef() != nil { + return w.subWfHandler.CheckSubWorkflowStatus(ctx, nCtx) + } else if wfNode.GetLaunchPlanRefID() != nil { + return w.lpHandler.CheckLaunchPlanStatus(ctx, nCtx) + } + } else if workflowPhase == v1alpha1.WorkflowNodePhaseFailing { + if wfNode == nil { + errMsg := "Invoked workflow handler, for a non workflow Node." + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, errMsg, nil)), nil + } - if wfNode.GetSubWorkflowRef() != nil { - return w.subWfHandler.CheckSubWorkflowStatus(ctx, nCtx) - } else if wfNode.GetLaunchPlanRefID() != nil { - return w.lpHandler.CheckLaunchPlanStatus(ctx, nCtx) + if wfNode.GetSubWorkflowRef() != nil { + trns, err := w.subWfHandler.HandleFailingSubWorkflow(ctx, nCtx) + return updateNodeStateFn(trns, workflowPhase, err) + } else if wfNode.GetLaunchPlanRefID() != nil { + // There is no failure node for launch plans, terminate immediately. + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(wfNodeState.Error, nil)), nil + } + + return invalidWFNodeError() } return invalidWFNodeError() diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index bcbcfe24ff..8c2b6e43b0 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/storage" @@ -48,6 +50,7 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx if startStatus.HasFailed() { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(startStatus.Err, nil)), nil } + return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nl) } @@ -60,10 +63,14 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler } if state.HasFailed() { + workflowNodeState := handler.WorkflowNodeState{ + Phase: v1alpha1.WorkflowNodePhaseFailing, + Error: state.Err, + } + + err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if subworkflow.GetOnFailureNode() != nil { - // TODO Handle Failure node for subworkflows. We need to add new state to the executor so that, we can continue returning Running, but in the next round start executing DoInFailureHandling - NOTE1 - // https://github.com/lyft/flyte/issues/265 - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(state.Err, nil)), err + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(state.Err, nil)), err } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(state.Err, nil)), err @@ -112,36 +119,60 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } -// TODO related to NOTE1, this is not used currently, but should be used. For this we will need to clean up the state machine in the main handle function -// https://github.com/lyft/flyte/issues/265 func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { + originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error if subworkflow.GetOnFailureNode() != nil { state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, subworkflow.GetOnFailureNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } + if state.NodePhase == executors.NodePhaseRunning { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } if state.HasFailed() { + // If handling failure node resulted in failure, its failure will mask the original failure for the workflow + // TODO: Consider returning both errors. return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(state.Err, nil)), nil } - if state.IsComplete() { + if state.PartiallyComplete() { if err := nCtx.EnqueueOwnerFunc()(); err != nil { return handler.UnknownTransition, err } + + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(originalError, nil)), nil + } + + // When handling the failure node succeeds, the final status will still be failure + // we use the original error as the failure reason. + if state.IsComplete() { + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr( + originalError, nil)), nil } - // state is } - // TODO we should use the error that we store in the state, this error should be the error that has caused the subworkflow to fail - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(&core.ExecutionError{ - Code: "FailureInSubWorkflow", - Message: "Failure in subworkflow missing in internal state", - Kind: core.ExecutionError_SYSTEM, // Should be USER ERROR once we have internal state - }, nil)), nil + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr( + originalError, nil)), nil +} + +func (s *subworkflowHandler) HandleFailingSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { + subWorkflow, err := GetSubWorkflow(ctx, nCtx) + if err != nil { + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.SubWorkflowExecutionFailed, err.Error(), nil)), nil + } + + status := nCtx.NodeStatus() + status.GetWorkflowNodeStatus() + if subWorkflow.GetOnFailureNode() == nil { + logger.Infof(ctx, "Subworkflow has no failure nodes, failing immediately.") + return handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoFailureErr(nCtx.NodeStateReader().GetWorkflowNodeState().Error, nil)), err + } + + nodeLookup := executors.NewNodeLookup(subWorkflow, status) + return s.HandleFailureNodeOfSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup) } func (s *subworkflowHandler) StartSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index e93ba96b71..c8131ee327 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -163,5 +163,6 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa if n.w != nil { t := s.GetOrCreateWorkflowStatus() t.SetWorkflowNodePhase(n.w.Phase) + t.SetExecutionError(n.w.Error) } } diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index a9e19d94bb..1ae102c80b 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -44,6 +44,10 @@ var StatusRunning = Status{TransitionToPhase: v1alpha1.WorkflowPhaseRunning} var StatusSucceeding = Status{TransitionToPhase: v1alpha1.WorkflowPhaseSucceeding} var StatusSuccess = Status{TransitionToPhase: v1alpha1.WorkflowPhaseSuccess} +func StatusFailureNode(originalErr *core.ExecutionError) Status { + return Status{TransitionToPhase: v1alpha1.WorkflowPhaseHandlingFailureNode, Err: originalErr} +} + func StatusFailing(err *core.ExecutionError) Status { return Status{TransitionToPhase: v1alpha1.WorkflowPhaseFailing, Err: err} } @@ -157,43 +161,63 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha return StatusRunning, nil } +func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { + execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage()) + errorNode := w.GetOnFailureNode() + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, errorNode) + if err != nil { + return StatusFailureNode(execErr), err + } + + if state.HasFailed() { + return StatusFailed(state.Err), nil + } + + if state.HasTimedOut() { + return StatusFailed(&core.ExecutionError{ + Kind: core.ExecutionError_USER, + Code: "TimedOut", + Message: "FailureNode Timed-out"}), nil + } + + if state.PartiallyComplete() { + // Re-enqueue the workflow + c.enqueueWorkflow(w.GetK8sWorkflowID().String()) + return StatusFailureNode(execErr), nil + } + + // If the failure node finished executing, transition to failed. + return StatusFailed(execErr), nil +} + +func executionErrorOrDefault(execError *core.ExecutionError, fallbackMessage string) *core.ExecutionError { + if execError == nil { + return &core.ExecutionError{ + Code: "UnknownError", + Message: fmt.Sprintf("Unknown error, last seen message [%s]", fallbackMessage), + Kind: core.ExecutionError_UNKNOWN, + } + } + + return execError +} + func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { + execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage()) + // Best effort clean-up. if err := c.cleanupRunningNodes(ctx, w, "Some node execution failed, auto-abort."); err != nil { - logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err) + logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", + w.ExecutionID.WorkflowExecutionIdentifier, err) + return StatusFailing(execErr), err } errorNode := w.GetOnFailureNode() if errorNode != nil { - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, errorNode) - if err != nil { - return StatusFailing(nil), err - } - if state.HasFailed() { - return StatusFailed(state.Err), nil - } - if state.HasTimedOut() { - return StatusFailed(&core.ExecutionError{ - Kind: core.ExecutionError_USER, - Code: "TimedOut", - Message: "FailureNode Timedout"}), nil - } - if state.PartiallyComplete() { - // Re-enqueue the workflow - c.enqueueWorkflow(w.GetK8sWorkflowID().String()) - return StatusFailing(nil), nil - } - // Fallthrough to handle state is complete + return StatusFailureNode(execErr), nil } - err := w.GetExecutionStatus().GetExecutionError() - if err == nil { - err = &core.ExecutionError{ - Code: "UnknownError", - Message: fmt.Sprintf("Unknown error, last seen message [%s]", w.GetExecutionStatus().GetMessage()), - Kind: core.ExecutionError_UNKNOWN, - } - } - return StatusFailed(err), nil + + return StatusFailed(execErr), nil } func (c *workflowExecutor) handleSucceedingWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) Status { @@ -250,6 +274,8 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W wfEvent.Phase = core.WorkflowExecution_RUNNING wStatus.UpdatePhase(v1alpha1.WorkflowPhaseRunning, fmt.Sprintf("Workflow Started"), nil) wfEvent.OccurredAt = utils.GetProtoTime(wStatus.GetStartedAt()) + case v1alpha1.WorkflowPhaseHandlingFailureNode: + fallthrough case v1alpha1.WorkflowPhaseFailing: wfEvent.Phase = core.WorkflowExecution_FAILING wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) @@ -372,6 +398,16 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1. } c.k8sRecorder.Event(w, corev1.EventTypeWarning, v1alpha1.WorkflowPhaseFailed.String(), "Workflow failed.") return nil + case v1alpha1.WorkflowPhaseHandlingFailureNode: + newStatus, err := c.handleFailureNode(ctx, w) + if err != nil { + return err + } + if err := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus); err != nil { + return err + } + c.k8sRecorder.Event(w, corev1.EventTypeWarning, v1alpha1.WorkflowPhaseFailed.String(), "Workflow failed.") + return nil default: return errors.Errorf(errors.IllegalStateError, w.ID, "Unsupported state [%s] for workflow", w.GetExecutionStatus().GetPhase().String()) }