Skip to content

Commit

Permalink
Event Version Change for Launch plan Handler (flyteorg#211)
Browse files Browse the repository at this point in the history
* Event Version Change for Launch plan Handler

* comment
  • Loading branch information
anandswaminathan authored Dec 4, 2020
1 parent b5c3f49 commit 3381560
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 12 deletions.
86 changes: 82 additions & 4 deletions flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var wfExecID = &core.WorkflowExecutionIdentifier{
Name: "name",
}

func createNodeContext(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus) *mocks3.NodeExecutionContext {
func createNodeContextWithVersion(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus, version v1alpha1.EventVersion) *mocks3.NodeExecutionContext {

wfNodeState := handler.WorkflowNodeState{}
state := &workflowNodeStateHolder{s: wfNodeState}
Expand Down Expand Up @@ -94,9 +94,25 @@ func createNodeContext(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNo
})
nCtx.OnNodeStateReader().Return(nr)
nCtx.OnNodeStateWriter().Return(state)

ex := &execMocks.ExecutionContext{}
ex.OnGetEventVersion().Return(version)
ex.OnGetParentInfo().Return(nil)
ex.OnGetName().Return("name")

nCtx.OnExecutionContext().Return(ex)

return nCtx
}

func createNodeContextV1(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus) *mocks3.NodeExecutionContext {
return createNodeContextWithVersion(phase, n, s, v1alpha1.EventVersion1)
}

func createNodeContext(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus) *mocks3.NodeExecutionContext {
return createNodeContextWithVersion(phase, n, s, v1alpha1.EventVersion0)
}

func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) {
ctx := context.TODO()

Expand Down Expand Up @@ -124,7 +140,7 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) {
wfStatus := &mocks2.MutableWorkflowNodeStatus{}
mockNodeStatus.OnGetOrCreateWorkflowStatus().Return(wfStatus)

t.Run("happy", func(t *testing.T) {
t.Run("happy v0", func(t *testing.T) {

mockLPExec := &mocks.Executor{}
h := New(nil, mockLPExec, promutils.NewTestScope())
Expand All @@ -146,6 +162,29 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase())
})

t.Run("happy v1", func(t *testing.T) {

mockLPExec := &mocks.Executor{}
h := New(nil, mockLPExec, promutils.NewTestScope())
mockLPExec.OnLaunchMatch(
ctx,
mock.MatchedBy(func(o launchplan.LaunchContext) bool {
return o.ParentNodeExecution.NodeId == mockNode.GetID() &&
o.ParentNodeExecution.ExecutionId == wfExecID
}),
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain)
}),
mock.MatchedBy(func(o *core.Identifier) bool { return lpID == o }),
mock.MatchedBy(func(o *core.LiteralMap) bool { return o.Literals == nil }),
).Return(nil)

nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus)
s, err := h.Handle(ctx, nCtx)
assert.NoError(t, err)
assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase())
})
}

func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
Expand Down Expand Up @@ -175,7 +214,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
mockNodeStatus.OnGetAttempts().Return(attempts)
mockNodeStatus.OnGetDataDir().Return(dataDir)

t.Run("stillRunning", func(t *testing.T) {
t.Run("stillRunning V0", func(t *testing.T) {

mockLPExec := &mocks.Executor{}

Expand All @@ -194,6 +233,25 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase())
})
t.Run("stillRunning V1", func(t *testing.T) {

mockLPExec := &mocks.Executor{}

h := New(nil, mockLPExec, promutils.NewTestScope())
mockLPExec.OnGetStatusMatch(
ctx,
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain)
}),
).Return(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
}, nil)

nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)
s, err := h.Handle(ctx, nCtx)
assert.NoError(t, err)
assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase())
})
}

func TestWorkflowNodeHandler_AbortNode(t *testing.T) {
Expand Down Expand Up @@ -223,7 +281,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) {
mockNodeStatus.OnGetAttempts().Return(attempts)
mockNodeStatus.OnGetDataDir().Return(dataDir)

t.Run("abort", func(t *testing.T) {
t.Run("abort v0", func(t *testing.T) {

mockLPExec := &mocks.Executor{}
nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)
Expand All @@ -244,6 +302,26 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) {
assert.NoError(t, err)
})

t.Run("abort v1", func(t *testing.T) {

mockLPExec := &mocks.Executor{}
nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)

h := New(nil, mockLPExec, promutils.NewTestScope())
mockLPExec.OnKillMatch(
ctx,
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain)
}),
mock.AnythingOfType(reflect.String.String()),
).Return(nil)

eCtx := &execMocks.ExecutionContext{}
nCtx.OnExecutionContext().Return(eCtx)
eCtx.OnGetName().Return("test")
err := h.Abort(ctx, nCtx, "test")
assert.NoError(t, err)
})
t.Run("abort-fail", func(t *testing.T) {

mockLPExec := &mocks.Executor{}
Expand Down
40 changes: 35 additions & 5 deletions flytepropeller/pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/lyft/flytepropeller/pkg/controller/nodes/common"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/storage"
Expand All @@ -18,15 +20,36 @@ type launchPlanHandler struct {
launchPlan launchplan.Executor
}

func getParentNodeExecutionID(nCtx handler.NodeExecutionContext) (*core.NodeExecutionIdentifier, error) {
nodeExecID := &core.NodeExecutionIdentifier{
ExecutionId: nCtx.NodeExecutionMetadata().GetNodeExecutionID().ExecutionId,
}
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
var err error
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId)
if err != nil {
return nil, err
}
nodeExecID.NodeId = currentNodeUniqueID
} else {
nodeExecID.NodeId = nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId
}
return nodeExecID, nil
}

func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) {
nodeInputs, err := nCtx.InputReader().Get(ctx)
if err != nil {
errMsg := fmt.Sprintf("Failed to read input. Error [%s]", err)
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, errMsg, nil)), nil
}

parentNodeExecutionID, err := getParentNodeExecutionID(nCtx)
if err != nil {
return handler.UnknownTransition, err
}
childID, err := GetChildWorkflowExecutionID(
nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
parentNodeExecutionID,
nCtx.CurrentAttempt(),
)
if err != nil {
Expand All @@ -37,7 +60,7 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No
// TODO we need to add principal and nestinglevel as annotations or labels?
Principal: "unknown",
NestingLevel: 0,
ParentNodeExecution: nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
ParentNodeExecution: parentNodeExecutionID,
}
err = l.launchPlan.Launch(ctx, launchCtx, childID, nCtx.Node().GetWorkflowNode().GetLaunchPlanRefID().Identifier, nodeInputs)
if err != nil {
Expand All @@ -60,10 +83,13 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No
}

func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) {

parentNodeExecutionID, err := getParentNodeExecutionID(nCtx)
if err != nil {
return handler.UnknownTransition, err
}
// Handle launch plan
childID, err := GetChildWorkflowExecutionID(
nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
parentNodeExecutionID,
nCtx.CurrentAttempt(),
)

Expand Down Expand Up @@ -141,8 +167,12 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
}

func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, reason string) error {
parentNodeExecutionID, err := getParentNodeExecutionID(nCtx)
if err != nil {
return err
}
childID, err := GetChildWorkflowExecutionID(
nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
parentNodeExecutionID,
nCtx.CurrentAttempt(),
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,9 +613,6 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) {
launchPlan: mockLPExec,
}
nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)
eCtx := &execMocks.ExecutionContext{}
eCtx.OnGetName().Return("name")
nCtx.OnExecutionContext().Return(eCtx)
err := h.HandleAbort(ctx, nCtx, "reason")
assert.Error(t, err)
assert.Equal(t, err, expectedErr)
Expand Down

0 comments on commit 3381560

Please sign in to comment.