Skip to content

Commit

Permalink
Bug/Feature: Dynamic parent node should be finalized earlier (flyteor…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Mar 24, 2020
1 parent dcd3156 commit c583db7
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 8 deletions.
5 changes: 5 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ func (in *BranchNodeStatus) Equals(other *BranchNodeStatus) bool {
type DynamicNodePhase int

const (
// This is the default phase for a Dynamic Node execution. This also implies that the parent node is being executed
DynamicNodePhaseNone DynamicNodePhase = iota
// This phase implies that all the sub-nodes are being executed
DynamicNodePhaseExecuting
// This implies that the dynamic sub-nodes have failed and failure is being handled
DynamicNodePhaseFailing
// This Phase implies that the Parent node is done but it needs to be finalized before progressing to the sub-nodes (or dynamically yielded nodes)
DynamicNodePhaseParentFinalizing
)

type DynamicNodeStatus struct {
Expand Down
26 changes: 22 additions & 4 deletions flytepropeller/pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevSt
// directly to progress the dynamically generated workflow.
logger.Infof(ctx, "future file detected, assuming dynamic node")
// There is a futures file, so we need to continue running the node with the modified state
return trns.WithInfo(handler.PhaseInfoRunning(trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseExecuting}, nil
return trns.WithInfo(handler.PhaseInfoRunning(trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalizing}, nil
}
}

Expand Down Expand Up @@ -129,6 +129,11 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n
return trns, newState, nil
}

// The State machine for a dynamic node is as follows
// DynamicNodePhaseNone: The parent node is being handled
// DynamicNodePhaseParentFinalizing: The parent node has completes successfully and sub-nodes exist (futures file found). Parent node is being finalized.
// DynamicNodePhaseExecuting: The parent node has completed and finalized successfully, the sub-nodes are being handled
// DynamicNodePhaseFailing: one or more of sub-nodes have failed and the failure is being handled
func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) {
ds := nCtx.NodeStateReader().GetDynamicNodeState()
var err error
Expand All @@ -150,6 +155,12 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod
}

trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("DynamicNodeFailed", ds.Reason, nil))
case v1alpha1.DynamicNodePhaseParentFinalizing:
if err := d.finalizeParentNode(ctx, nCtx); err != nil {
return handler.UnknownTransition, err
}
newState = handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseExecuting}
trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(trns.Info().GetInfo()))
default:
trns, newState, err = d.handleParentNode(ctx, ds, nCtx)
if err != nil {
Expand Down Expand Up @@ -189,6 +200,15 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node
}
}

func (d dynamicNodeTaskNodeHandler) finalizeParentNode(ctx context.Context, nCtx handler.NodeExecutionContext) error {
logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt())
if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil {
logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.")
return err
}
return nil
}

// This is a weird method. We should always finalize before we set the dynamic parent node phase as complete?
func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext) error {
errs := make([]error, 0, 2)
Expand All @@ -212,9 +232,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N
// We should always finalize the parent node success or failure.
// If we use the phase to decide when to finalize in the case where Dynamic node is in phase Executiing
// (i.e. child nodes are now being executed) and Finalize is invoked, we will never invoke the finalizer for the parent.
logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt())
if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil {
logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.")
if err := d.finalizeParentNode(ctx, nCtx); err != nil {
errs = append(errs, err)
}

Expand Down
129 changes: 125 additions & 4 deletions flytepropeller/pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) {
{"running-non-parent", args{trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(i))}, want{p: handler.EPhaseRunning, info: i}},
{"retryfailure-non-parent", args{trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("x", "y", i))}, want{p: handler.EPhaseRetryableFailure, info: i}},
{"failure-non-parent", args{trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("x", "y", i))}, want{p: handler.EPhaseFailed, info: i}},
{"success-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(nil))}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}},
{"success-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(nil))}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseParentFinalizing}},
{"running-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(i))}, want{p: handler.EPhaseRunning, info: i}},
{"retryfailure-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("x", "y", i))}, want{p: handler.EPhaseRetryableFailure, info: i}},
{"failure-non-parent", args{isDynamic: true, trns: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("x", "y", i))}, want{p: handler.EPhaseFailed, info: i}},
Expand All @@ -160,7 +160,7 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
nCtx := createNodeContext("test")
s := &dynamicNodeStateHolder{}
nCtx.On("NodeStateWriter").Return(s)
nCtx.OnNodeStateWriter().Return(s)
if tt.args.isDynamic {
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb")
assert.NoError(t, err)
Expand All @@ -170,9 +170,9 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) {
h := &mocks.TaskNodeHandler{}
n := &executorMocks.Node{}
if tt.args.isErr {
h.On("Handle", mock.Anything, mock.Anything).Return(handler.UnknownTransition, fmt.Errorf("error"))
h.OnHandleMatch(mock.Anything, mock.Anything).Return(handler.UnknownTransition, fmt.Errorf("error"))
} else {
h.On("Handle", mock.Anything, mock.Anything).Return(tt.args.trns, nil)
h.OnHandleMatch(mock.Anything, mock.Anything).Return(tt.args.trns, nil)
}
d := New(h, n, promutils.NewTestScope())
got, err := d.Handle(context.TODO(), nCtx)
Expand All @@ -187,6 +187,127 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) {
}
})
}

}

func Test_dynamicNodeHandler_Handle_ParentFinalize(t *testing.T) {
createNodeContext := func(ttype string) *nodeMocks.NodeExecutionContext {
wfExecID := &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}

nm := &nodeMocks.NodeExecutionMetadata{}
nm.On("GetAnnotations").Return(map[string]string{})
nm.On("GetExecutionID").Return(v1alpha1.WorkflowExecutionIdentifier{
WorkflowExecutionIdentifier: wfExecID,
})
nm.On("GetK8sServiceAccount").Return("service-account")
nm.On("GetLabels").Return(map[string]string{})
nm.On("GetNamespace").Return("namespace")
nm.On("GetOwnerID").Return(types.NamespacedName{Namespace: "namespace", Name: "name"})
nm.On("GetOwnerReference").Return(v1.OwnerReference{
Kind: "sample",
Name: "name",
})

taskID := &core.Identifier{}
tk := &core.TaskTemplate{
Id: taskID,
Type: "test",
Metadata: &core.TaskMetadata{
Discoverable: true,
},
Interface: &core.TypedInterface{
Outputs: &core.VariableMap{
Variables: map[string]*core.Variable{
"x": {
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_BOOLEAN,
},
},
},
},
},
},
}
tr := &nodeMocks.TaskReader{}
tr.On("GetTaskID").Return(taskID)
tr.On("GetTaskType").Return(ttype)
tr.On("Read", mock.Anything).Return(tk, nil)

ns := &flyteMocks.ExecutableNodeStatus{}
ns.On("GetDataDir").Return(storage.DataReference("data-dir"))
ns.On("GetOutputDir").Return(storage.DataReference("data-dir"))

res := &v12.ResourceRequirements{}
n := &flyteMocks.ExecutableNode{}
n.On("GetResources").Return(res)

dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

ir := &ioMocks.InputReader{}
nCtx := &nodeMocks.NodeExecutionContext{}
nCtx.On("NodeExecutionMetadata").Return(nm)
nCtx.On("Node").Return(n)
nCtx.On("InputReader").Return(ir)
nCtx.On("DataReferenceConstructor").Return(storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()))
nCtx.On("CurrentAttempt").Return(uint32(1))
nCtx.On("TaskReader").Return(tr)
nCtx.On("MaxDatasetSizeBytes").Return(int64(1))
nCtx.On("NodeStatus").Return(ns)
nCtx.On("NodeID").Return("n1")
nCtx.On("EnqueueOwner").Return(nil)
nCtx.OnDataStore().Return(dataStore)

r := &nodeMocks.NodeStateReader{}
r.On("GetDynamicNodeState").Return(handler.DynamicNodeState{
Phase: v1alpha1.DynamicNodePhaseParentFinalizing,
})
nCtx.OnNodeStateReader().Return(r)

return nCtx
}

t.Run("parent-finalize-success", func(t *testing.T) {
nCtx := createNodeContext("test")
s := &dynamicNodeStateHolder{
s: handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalizing},
}
nCtx.OnNodeStateWriter().Return(s)
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb")
assert.NoError(t, err)
dj := &core.DynamicJobSpec{}
n := &executorMocks.Node{}
assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj))
h := &mocks.TaskNodeHandler{}
h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(nil)
d := New(h, n, promutils.NewTestScope())
got, err := d.Handle(context.TODO(), nCtx)
assert.NoError(t, err)
assert.Equal(t, handler.EPhaseRunning.String(), got.Info().GetPhase().String())
})

t.Run("parent-finalize-error", func(t *testing.T) {
nCtx := createNodeContext("test")
s := &dynamicNodeStateHolder{
s: handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalizing},
}
nCtx.OnNodeStateWriter().Return(s)
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb")
assert.NoError(t, err)
dj := &core.DynamicJobSpec{}
n := &executorMocks.Node{}
assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj))
h := &mocks.TaskNodeHandler{}
h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(fmt.Errorf("err"))
d := New(h, n, promutils.NewTestScope())
_, err = d.Handle(context.TODO(), nCtx)
assert.Error(t, err)
})
}

func createDynamicJobSpec() *core.DynamicJobSpec {
Expand Down

0 comments on commit c583db7

Please sign in to comment.