diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index c1188ec26c..1006ef965e 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -15,6 +15,7 @@ var ( configSection = config.MustRegisterSection(configSectionKey, defaultConfig) defaultConfig = &Config{ + MaxWorkflowRetries: 5, MaxDatasetSizeBytes: 10 * 1024 * 1024, Queue: CompositeQueueConfig{ Type: CompositeQueueSimple, diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 88ba212e76..9f3cfa6668 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -171,7 +171,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node case v1alpha1.DynamicNodePhaseFailing: fallthrough case v1alpha1.DynamicNodePhaseExecuting: - logger.Infof(ctx, "Aborting dynamic workflow.") + logger.Infof(ctx, "Aborting dynamic workflow at RetryAttempt [%d]", nCtx.CurrentAttempt()) dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx) if err != nil { return err @@ -183,7 +183,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node return d.nodeExecutor.AbortHandler(ctx, dynamicWF, dynamicWF.StartNode(), reason) default: - logger.Infof(ctx, "Aborting regular node.") + logger.Infof(ctx, "Aborting regular node RetryAttempt [%d]", nCtx.CurrentAttempt()) // The parent node has not yet completed, so we will abort the parent node return d.TaskNodeHandler.Abort(ctx, nCtx, reason) } @@ -195,7 +195,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N ds := nCtx.NodeStateReader().GetDynamicNodeState() if ds.Phase == v1alpha1.DynamicNodePhaseFailing || ds.Phase == v1alpha1.DynamicNodePhaseExecuting { - logger.Infof(ctx, "Finalizing dynamic workflow") + logger.Infof(ctx, "Finalizing dynamic workflow RetryAttempt [%d]", nCtx.CurrentAttempt()) dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx) if err != nil { errs = append(errs, err) @@ -212,7 +212,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N // We should always finalize the parent node success or failure. // If we use the phase to decide when to finalize in the case where Dynamic node is in phase Executiing // (i.e. child nodes are now being executed) and Finalize is invoked, we will never invoke the finalizer for the parent. - logger.Infof(ctx, "Finalizing Parent node") + logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt()) if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil { logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.") errs = append(errs, err) diff --git a/pkg/controller/nodes/dynamic/handler_test.go b/pkg/controller/nodes/dynamic/handler_test.go index 65251616f7..07dadeb91a 100644 --- a/pkg/controller/nodes/dynamic/handler_test.go +++ b/pkg/controller/nodes/dynamic/handler_test.go @@ -455,6 +455,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { sr := &nodeMocks.NodeStateReader{} sr.OnGetDynamicNodeState().Return(s) nCtx.OnNodeStateReader().Return(sr) + nCtx.OnCurrentAttempt().Return(0) h := &mocks.TaskNodeHandler{} h.OnFinalize(ctx, nCtx).Return(nil) diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index c1afeb2de0..cc933d3ad1 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -374,6 +374,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork return executors.NodeStatusUndefined, err } + // NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state + // Attempt is used throughout the system to determine the idempotent resource version. + nodeStatus.IncrementAttempts() nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying") // We are going to retry in the next round, so we should clear all current state nodeStatus.ClearSubNodeStatus() @@ -399,7 +402,6 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork } if p.GetPhase() == handler.EPhaseRetryableFailure { - nodeStatus.IncrementAttempts() if p.GetErr() != nil && p.GetErr().GetKind() == core.ExecutionError_SYSTEM { nodeStatus.IncrementSystemFailures() } diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index f2b05ae5f4..bcef2bd900 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -739,9 +739,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { }, false, true, core.NodeExecution_FAILED, 0}, - {"(retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) { + {"retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) { return handler.UnknownTransition, fmt.Errorf("should not be invoked") - }, false, false, core.NodeExecution_RUNNING, 0}, + }, false, false, core.NodeExecution_RUNNING, 1}, {"running->failing", v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, executors.NodePhasePending, func() (handler.Transition, error) { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("code", "reason", nil)), nil diff --git a/pkg/controller/nodes/subworkflow/handler.go b/pkg/controller/nodes/subworkflow/handler.go index 9a485a466a..2b5e3852a8 100644 --- a/pkg/controller/nodes/subworkflow/handler.go +++ b/pkg/controller/nodes/subworkflow/handler.go @@ -35,7 +35,7 @@ func (w *workflowNodeHandler) FinalizeRequired() bool { return false } -func (w *workflowNodeHandler) Setup(ctx context.Context, setupContext handler.SetupContext) error { +func (w *workflowNodeHandler) Setup(_ context.Context, _ handler.SetupContext) error { return nil } @@ -94,17 +94,17 @@ func (w *workflowNodeHandler) Abort(ctx context.Context, nCtx handler.NodeExecut wf := nCtx.Workflow() wfNode := nCtx.Node().GetWorkflowNode() if wfNode.GetSubWorkflowRef() != nil { - return w.subWfHandler.HandleAbort(ctx, nCtx, wf, *wfNode.GetSubWorkflowRef()) + return w.subWfHandler.HandleAbort(ctx, nCtx, wf, *wfNode.GetSubWorkflowRef(), reason) } if wfNode.GetLaunchPlanRefID() != nil { - return w.lpHandler.HandleAbort(ctx, wf, nCtx.Node()) + return w.lpHandler.HandleAbort(ctx, wf, nCtx.Node(), reason) } return nil } -func (w *workflowNodeHandler) Finalize(ctx context.Context, executionContext handler.NodeExecutionContext) error { - logger.Debugf(ctx, "WorkflowNode::Finalizer: nothing to do") +func (w *workflowNodeHandler) Finalize(ctx context.Context, _ handler.NodeExecutionContext) error { + logger.Warnf(ctx, "Subworkflow finalize invoked. Nothing to be done") return nil } diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index 7ce9525ba8..90c1a98195 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -149,7 +149,7 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } -func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode) error { +func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, reason string) error { nodeStatus := w.GetNodeExecutionStatus(ctx, node.GetID()) childID, err := GetChildWorkflowExecutionID( w.GetExecutionID().WorkflowExecutionIdentifier, @@ -160,5 +160,5 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.Executab // THIS SHOULD NEVER HAPPEN return err } - return l.launchPlan.Kill(ctx, childID, fmt.Sprintf("parent execution id [%s] aborted", w.GetName())) + return l.launchPlan.Kill(ctx, childID, fmt.Sprintf("parent execution id [%s] aborted, reason [%s]", w.GetName(), reason)) } diff --git a/pkg/controller/nodes/subworkflow/launchplan_test.go b/pkg/controller/nodes/subworkflow/launchplan_test.go index 16ad0909c9..e5ddefd0ca 100644 --- a/pkg/controller/nodes/subworkflow/launchplan_test.go +++ b/pkg/controller/nodes/subworkflow/launchplan_test.go @@ -10,16 +10,17 @@ import ( "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/promutils" + "github.com/lyft/flytestdlib/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" mocks2 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" "github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" "github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks" "github.com/lyft/flytepropeller/pkg/utils" - "github.com/lyft/flytestdlib/promutils" - "github.com/lyft/flytestdlib/storage" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) func createInmemoryStore(t testing.TB) *storage.DataStore { @@ -614,7 +615,6 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) { t.Run("abort-success", func(t *testing.T) { mockLPExec := &mocks.Executor{} - //mockStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, storage.NewDefaultProtobufStore(utils.FailingRawStore{}, promutils.NewTestScope())) mockLPExec.On("Kill", ctx, mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { @@ -626,14 +626,13 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) { h := launchPlanHandler{ launchPlan: mockLPExec, } - err := h.HandleAbort(ctx, mockWf, mockNode) + err := h.HandleAbort(ctx, mockWf, mockNode, "some reason") assert.NoError(t, err) }) t.Run("abort-fail", func(t *testing.T) { expectedErr := fmt.Errorf("fail") mockLPExec := &mocks.Executor{} - // mockStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, storage.NewDefaultProtobufStore(utils.FailingRawStore{}, promutils.NewTestScope())) mockLPExec.On("Kill", ctx, mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { @@ -645,7 +644,7 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) { h := launchPlanHandler{ launchPlan: mockLPExec, } - err := h.HandleAbort(ctx, mockWf, mockNode) + err := h.HandleAbort(ctx, mockWf, mockNode, "reason") assert.Error(t, err) assert.Equal(t, err, expectedErr) }) diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index 99ffe5b271..c11635b9cc 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -4,11 +4,12 @@ import ( "context" "fmt" + "github.com/lyft/flytestdlib/storage" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/executors" "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" - "github.com/lyft/flytestdlib/storage" ) // TODO Add unit tests for subworkflow handler @@ -174,7 +175,7 @@ func (s *subworkflowHandler) HandleSubWorkflowFailingNode(ctx context.Context, n return s.DoInFailureHandling(ctx, nCtx, contextualSubWorkflow) } -func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, w v1alpha1.ExecutableWorkflow, workflowID v1alpha1.WorkflowID) error { +func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, w v1alpha1.ExecutableWorkflow, workflowID v1alpha1.WorkflowID, reason string) error { subWorkflow := w.FindSubWorkflow(workflowID) if subWorkflow == nil { return fmt.Errorf("no sub workflow [%s] found in node [%s]", workflowID, nCtx.NodeID()) @@ -183,12 +184,12 @@ func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeE nodeStatus := w.GetNodeExecutionStatus(ctx, nCtx.NodeID()) contextualSubWorkflow := executors.NewSubContextualWorkflow(w, subWorkflow, nodeStatus) - startNode := w.StartNode() + startNode := contextualSubWorkflow.StartNode() if startNode == nil { return fmt.Errorf("no sub workflow [%s] found in node [%s]", workflowID, nCtx.NodeID()) } - return s.nodeExecutor.AbortHandler(ctx, contextualSubWorkflow, startNode, "") + return s.nodeExecutor.AbortHandler(ctx, contextualSubWorkflow, startNode, reason) } func newSubworkflowHandler(nodeExecutor executors.Node) subworkflowHandler { diff --git a/pkg/controller/nodes/subworkflow/subworkflow_test.go b/pkg/controller/nodes/subworkflow/subworkflow_test.go new file mode 100644 index 0000000000..0b01e5e683 --- /dev/null +++ b/pkg/controller/nodes/subworkflow/subworkflow_test.go @@ -0,0 +1,78 @@ +package subworkflow + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + coreMocks "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + execMocks "github.com/lyft/flytepropeller/pkg/controller/executors/mocks" + "github.com/lyft/flytepropeller/pkg/controller/nodes/handler/mocks" +) + +func Test_subworkflowHandler_HandleAbort(t *testing.T) { + ctx := context.TODO() + + t.Run("missing-subworkflow", func(t *testing.T) { + nCtx := &mocks.NodeExecutionContext{} + nodeExec := &execMocks.Node{} + s := newSubworkflowHandler(nodeExec) + wf := &coreMocks.ExecutableWorkflow{} + wf.OnFindSubWorkflow("x").Return(nil) + nCtx.OnNodeID().Return("n1") + assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason")) + }) + + t.Run("missing-startNode", func(t *testing.T) { + nCtx := &mocks.NodeExecutionContext{} + nodeExec := &execMocks.Node{} + s := newSubworkflowHandler(nodeExec) + wf := &coreMocks.ExecutableWorkflow{} + st := &coreMocks.ExecutableNodeStatus{} + swf := &coreMocks.ExecutableSubWorkflow{} + wf.OnFindSubWorkflow("x").Return(swf) + wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st) + nCtx.OnNodeID().Return("n1") + swf.OnStartNode().Return(nil) + assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason")) + }) + + t.Run("abort-error", func(t *testing.T) { + nCtx := &mocks.NodeExecutionContext{} + nodeExec := &execMocks.Node{} + s := newSubworkflowHandler(nodeExec) + wf := &coreMocks.ExecutableWorkflow{} + st := &coreMocks.ExecutableNodeStatus{} + swf := &coreMocks.ExecutableSubWorkflow{} + wf.OnFindSubWorkflow("x").Return(swf) + wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st) + nCtx.OnNodeID().Return("n1") + n := &coreMocks.ExecutableNode{} + swf.OnStartNode().Return(n) + nodeExec.OnAbortHandler(ctx, wf, n, "reason").Return(fmt.Errorf("err")) + assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason")) + }) + + t.Run("abort-success", func(t *testing.T) { + nCtx := &mocks.NodeExecutionContext{} + nodeExec := &execMocks.Node{} + s := newSubworkflowHandler(nodeExec) + wf := &coreMocks.ExecutableWorkflow{} + st := &coreMocks.ExecutableNodeStatus{} + swf := &coreMocks.ExecutableSubWorkflow{} + wf.OnFindSubWorkflow("x").Return(swf) + wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st) + nCtx.OnNodeID().Return("n1") + n := &coreMocks.ExecutableNode{} + swf.OnStartNode().Return(n) + swf.OnGetID().Return("swf") + nodeExec.OnAbortHandlerMatch(mock.Anything, mock.MatchedBy(func(wf v1alpha1.ExecutableWorkflow) bool { + return wf.GetID() == swf.GetID() + }), n, mock.Anything).Return(nil) + assert.NoError(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason")) + }) +}