diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index e7a862a0a2..a47c7719da 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -81,9 +81,14 @@ func (in *BranchNodeStatus) Equals(other *BranchNodeStatus) bool { type DynamicNodePhase int const ( + // This is the default phase for a Dynamic Node execution. This also implies that the parent node is being executed DynamicNodePhaseNone DynamicNodePhase = iota + // This phase implies that all the sub-nodes are being executed DynamicNodePhaseExecuting + // This implies that the dynamic sub-nodes have failed and failure is being handled DynamicNodePhaseFailing + // This Phase implies that the Parent node is done but it needs to be finalized before progressing to the sub-nodes (or dynamically yielded nodes) + DynamicNodePhaseParentFinalizing ) type DynamicNodeStatus struct { diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler.go b/flytepropeller/pkg/controller/nodes/dynamic/handler.go index 9f3cfa6668..8531d1543f 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler.go @@ -84,7 +84,7 @@ func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevSt // directly to progress the dynamically generated workflow. logger.Infof(ctx, "future file detected, assuming dynamic node") // There is a futures file, so we need to continue running the node with the modified state - return trns.WithInfo(handler.PhaseInfoRunning(trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseExecuting}, nil + return trns.WithInfo(handler.PhaseInfoRunning(trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalizing}, nil } } @@ -129,6 +129,11 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n return trns, newState, nil } +// The State machine for a dynamic node is as follows +// DynamicNodePhaseNone: The parent node is being handled +// DynamicNodePhaseParentFinalizing: The parent node has completes successfully and sub-nodes exist (futures file found). Parent node is being finalized. +// DynamicNodePhaseExecuting: The parent node has completed and finalized successfully, the sub-nodes are being handled +// DynamicNodePhaseFailing: one or more of sub-nodes have failed and the failure is being handled func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { ds := nCtx.NodeStateReader().GetDynamicNodeState() var err error @@ -150,6 +155,12 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod } trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("DynamicNodeFailed", ds.Reason, nil)) + case v1alpha1.DynamicNodePhaseParentFinalizing: + if err := d.finalizeParentNode(ctx, nCtx); err != nil { + return handler.UnknownTransition, err + } + newState = handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseExecuting} + trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(trns.Info().GetInfo())) default: trns, newState, err = d.handleParentNode(ctx, ds, nCtx) if err != nil { @@ -189,6 +200,15 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node } } +func (d dynamicNodeTaskNodeHandler) finalizeParentNode(ctx context.Context, nCtx handler.NodeExecutionContext) error { + logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt()) + if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil { + logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.") + return err + } + return nil +} + // This is a weird method. We should always finalize before we set the dynamic parent node phase as complete? func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext) error { errs := make([]error, 0, 2) @@ -212,9 +232,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N // We should always finalize the parent node success or failure. // If we use the phase to decide when to finalize in the case where Dynamic node is in phase Executiing // (i.e. child nodes are now being executed) and Finalize is invoked, we will never invoke the finalizer for the parent. - logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt()) - if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil { - logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.") + if err := d.finalizeParentNode(ctx, nCtx); err != nil { errs = append(errs, err) } diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go index 07dadeb91a..b7b2ca4534 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go @@ -151,7 +151,7 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { {"running-non-parent", args{trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(i))}, want{p: handler.EPhaseRunning, info: i}}, {"retryfailure-non-parent", args{trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("x", "y", i))}, want{p: handler.EPhaseRetryableFailure, info: i}}, {"failure-non-parent", args{trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("x", "y", i))}, want{p: handler.EPhaseFailed, info: i}}, - {"success-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(nil))}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"success-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(nil))}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseParentFinalizing}}, {"running-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(i))}, want{p: handler.EPhaseRunning, info: i}}, {"retryfailure-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("x", "y", i))}, want{p: handler.EPhaseRetryableFailure, info: i}}, {"failure-non-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("x", "y", i))}, want{p: handler.EPhaseFailed, info: i}}, @@ -160,7 +160,7 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { t.Run(tt.name, func(t *testing.T) { nCtx := createNodeContext("test") s := &dynamicNodeStateHolder{} - nCtx.On("NodeStateWriter").Return(s) + nCtx.OnNodeStateWriter().Return(s) if tt.args.isDynamic { f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb") assert.NoError(t, err) @@ -170,9 +170,9 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { h := &mocks.TaskNodeHandler{} n := &executorMocks.Node{} if tt.args.isErr { - h.On("Handle", mock.Anything, mock.Anything).Return(handler.UnknownTransition, fmt.Errorf("error")) + h.OnHandleMatch(mock.Anything, mock.Anything).Return(handler.UnknownTransition, fmt.Errorf("error")) } else { - h.On("Handle", mock.Anything, mock.Anything).Return(tt.args.trns, nil) + h.OnHandleMatch(mock.Anything, mock.Anything).Return(tt.args.trns, nil) } d := New(h, n, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) @@ -187,6 +187,127 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { } }) } + +} + +func Test_dynamicNodeHandler_Handle_ParentFinalize(t *testing.T) { + createNodeContext := func(ttype string) *nodeMocks.NodeExecutionContext { + wfExecID := &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + } + + nm := &nodeMocks.NodeExecutionMetadata{} + nm.On("GetAnnotations").Return(map[string]string{}) + nm.On("GetExecutionID").Return(v1alpha1.WorkflowExecutionIdentifier{ + WorkflowExecutionIdentifier: wfExecID, + }) + nm.On("GetK8sServiceAccount").Return("service-account") + nm.On("GetLabels").Return(map[string]string{}) + nm.On("GetNamespace").Return("namespace") + nm.On("GetOwnerID").Return(types.NamespacedName{Namespace: "namespace", Name: "name"}) + nm.On("GetOwnerReference").Return(v1.OwnerReference{ + Kind: "sample", + Name: "name", + }) + + taskID := &core.Identifier{} + tk := &core.TaskTemplate{ + Id: taskID, + Type: "test", + Metadata: &core.TaskMetadata{ + Discoverable: true, + }, + Interface: &core.TypedInterface{ + Outputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "x": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_BOOLEAN, + }, + }, + }, + }, + }, + }, + } + tr := &nodeMocks.TaskReader{} + tr.On("GetTaskID").Return(taskID) + tr.On("GetTaskType").Return(ttype) + tr.On("Read", mock.Anything).Return(tk, nil) + + ns := &flyteMocks.ExecutableNodeStatus{} + ns.On("GetDataDir").Return(storage.DataReference("data-dir")) + ns.On("GetOutputDir").Return(storage.DataReference("data-dir")) + + res := &v12.ResourceRequirements{} + n := &flyteMocks.ExecutableNode{} + n.On("GetResources").Return(res) + + dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + ir := &ioMocks.InputReader{} + nCtx := &nodeMocks.NodeExecutionContext{} + nCtx.On("NodeExecutionMetadata").Return(nm) + nCtx.On("Node").Return(n) + nCtx.On("InputReader").Return(ir) + nCtx.On("DataReferenceConstructor").Return(storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())) + nCtx.On("CurrentAttempt").Return(uint32(1)) + nCtx.On("TaskReader").Return(tr) + nCtx.On("MaxDatasetSizeBytes").Return(int64(1)) + nCtx.On("NodeStatus").Return(ns) + nCtx.On("NodeID").Return("n1") + nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnDataStore().Return(dataStore) + + r := &nodeMocks.NodeStateReader{} + r.On("GetDynamicNodeState").Return(handler.DynamicNodeState{ + Phase: v1alpha1.DynamicNodePhaseParentFinalizing, + }) + nCtx.OnNodeStateReader().Return(r) + + return nCtx + } + + t.Run("parent-finalize-success", func(t *testing.T) { + nCtx := createNodeContext("test") + s := &dynamicNodeStateHolder{ + s: handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalizing}, + } + nCtx.OnNodeStateWriter().Return(s) + f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb") + assert.NoError(t, err) + dj := &core.DynamicJobSpec{} + n := &executorMocks.Node{} + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) + h := &mocks.TaskNodeHandler{} + h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(nil) + d := New(h, n, promutils.NewTestScope()) + got, err := d.Handle(context.TODO(), nCtx) + assert.NoError(t, err) + assert.Equal(t, handler.EPhaseRunning.String(), got.Info().GetPhase().String()) + }) + + t.Run("parent-finalize-error", func(t *testing.T) { + nCtx := createNodeContext("test") + s := &dynamicNodeStateHolder{ + s: handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalizing}, + } + nCtx.OnNodeStateWriter().Return(s) + f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb") + assert.NoError(t, err) + dj := &core.DynamicJobSpec{} + n := &executorMocks.Node{} + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) + h := &mocks.TaskNodeHandler{} + h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(fmt.Errorf("err")) + d := New(h, n, promutils.NewTestScope()) + _, err = d.Handle(context.TODO(), nCtx) + assert.Error(t, err) + }) } func createDynamicJobSpec() *core.DynamicJobSpec {