Skip to content

Commit

Permalink
Populate HasOutputs and ParentWorkflowID on cache miss (#295)
Browse files Browse the repository at this point in the history
## Overview
Fixes a bug introduced in https://github.com/unionai/flyte/pull/174/files#diff-620b44ef785082232823cd03fb7744c974fae0469bc0826340ac0e37f44af96cR276 and being investigated as part of https://unionai.atlassian.net/browse/CASE-643.

When rehydrating an `executionCacheItem` on a cache miss (propeller restart or pressure eviction), we weren't populating `HasOutputs` or `ParentWorkflowID`. These were assumed to be available on `syncItem()`. With `HasOutputs` set to `false` we skipped calling `adminClient.GetExecutionData()` and wrote a zero-length output file.

## Test Plan
Added test coverage that now succeeds with this change.

## Rollout Plan (if applicable)
Pick up in unionai/cloud and deploy

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed
  • Loading branch information
andrewwdye authored May 22, 2024
1 parent b05bdd4 commit 6d61571
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain)
}),
mock.MatchedBy(func(o v1alpha1.ExecutableLaunchPlan) bool { return true }),
mock.MatchedBy(func(o v1alpha1.WorkflowID) bool { return true }),
).Return(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
}, &core.LiteralMap{}, nil)
Expand All @@ -281,6 +283,8 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain)
}),
mock.MatchedBy(func(o v1alpha1.ExecutableLaunchPlan) bool { return true }),
mock.MatchedBy(func(o v1alpha1.WorkflowID) bool { return true }),
).Return(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
}, &core.LiteralMap{}, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,13 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx inte
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil
}

wfStatusClosure, outputs, err := l.launchPlan.GetStatus(ctx, childID)
launchPlanRefID := nCtx.Node().GetWorkflowNode().GetLaunchPlanRefID()
launchPlan := nCtx.ExecutionContext().FindLaunchPlan(*launchPlanRefID)
if launchPlan == nil {
return handler.DoTransition(handler.TransitionTypeEphemeral,
handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.BadSpecificationError, fmt.Sprintf("launch plan not found [%v]", launchPlanRefID), nil)), nil
}
wfStatusClosure, outputs, err := l.launchPlan.GetStatus(ctx, childID, launchPlan, nCtx.NodeExecutionMetadata().GetOwnerID().String())
if err != nil {
if launchplan.IsNotFound(err) { // NotFound
errorCode, _ := errors.GetErrorCode(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo
}
}

hasOutputs := launchPlan.GetInterface() != nil && launchPlan.GetInterface().GetOutputs() != nil && len(launchPlan.GetInterface().GetOutputs().GetVariables()) > 0
hasOutputs := launchPlan.GetInterface() != nil && launchPlan.GetInterface().GetOutputs().GetVariables() != nil && len(launchPlan.GetInterface().GetOutputs().GetVariables()) > 0
_, err = a.cache.GetOrCreate(executionID.String(), executionCacheItem{
WorkflowExecutionIdentifier: *executionID,
HasOutputs: hasOutputs,
Expand All @@ -163,12 +163,18 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo
return nil
}

func (a *adminLaunchPlanExecutor) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error) {
func (a *adminLaunchPlanExecutor) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier,
launchPlan v1alpha1.ExecutableLaunchPlan, parentWorkflowID v1alpha1.WorkflowID) (*admin.ExecutionClosure, *core.LiteralMap, error) {
if executionID == nil {
return nil, nil, fmt.Errorf("nil executionID")
}

obj, err := a.cache.GetOrCreate(executionID.String(), executionCacheItem{WorkflowExecutionIdentifier: *executionID})
hasOutputs := launchPlan.GetInterface() != nil && launchPlan.GetInterface().GetOutputs().GetVariables() != nil && len(launchPlan.GetInterface().GetOutputs().GetVariables()) > 0
obj, err := a.cache.GetOrCreate(executionID.String(), executionCacheItem{
WorkflowExecutionIdentifier: *executionID,
HasOutputs: hasOutputs,
ParentWorkflowID: parentWorkflowID,
})
if err != nil {
return nil, nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ import (
storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks"
)

var (
launchPlanWithOutputs = &core.LaunchPlanTemplate{
Id: &core.Identifier{},
Interface: &core.TypedInterface{
Inputs: &core.VariableMap{
Variables: map[string]*core.Variable{
"foo": {
Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}},
},
},
},
Outputs: &core.VariableMap{
Variables: map[string]*core.Variable{
"bar": {
Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}},
},
},
},
},
}
parentWorkflowID = "parentwf"
)

func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
ctx := context.TODO()
adminConfig := defaultAdminConfig
Expand All @@ -49,9 +72,22 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }),
).Return(result, nil)
assert.NoError(t, err)
s, _, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(
ctx,
id,
launchPlanWithOutputs,
parentWorkflowID,
)
assert.NoError(t, err)
assert.Equal(t, result, s)

item, err := exec.(*adminLaunchPlanExecutor).cache.Get(id.String())
assert.NoError(t, err)
assert.NotNil(t, item)
assert.IsType(t, executionCacheItem{}, item)
e := item.(executionCacheItem)
assert.True(t, e.HasOutputs)
assert.Equal(t, parentWorkflowID, e.ParentWorkflowID)
})

t.Run("notFound", func(t *testing.T) {
Expand Down Expand Up @@ -86,18 +122,16 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
},
},
id,
&core.LaunchPlanTemplate{
Id: &core.Identifier{},
},
launchPlanWithOutputs,
nil,
"",
parentWorkflowID,
)
assert.NoError(t, err)

// Allow for sync to be called
time.Sleep(time.Second)

s, _, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id, launchPlanWithOutputs, parentWorkflowID)
assert.Error(t, err)
assert.Nil(t, s)
assert.True(t, IsNotFound(err))
Expand Down Expand Up @@ -135,18 +169,16 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
},
},
id,
&core.LaunchPlanTemplate{
Id: &core.Identifier{},
},
launchPlanWithOutputs,
nil,
"",
parentWorkflowID,
)
assert.NoError(t, err)

// Allow for sync to be called
time.Sleep(time.Second)

s, _, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id, launchPlanWithOutputs, parentWorkflowID)
assert.Error(t, err)
assert.Nil(t, s)
assert.False(t, IsNotFound(err))
Expand Down Expand Up @@ -190,11 +222,9 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
},
},
id,
&core.LaunchPlanTemplate{
Id: &core.Identifier{},
},
launchPlanWithOutputs,
nil,
"",
parentWorkflowID,
)
assert.NoError(t, err)
})
Expand Down Expand Up @@ -229,11 +259,9 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
ParentNodeExecution: parentNodeExecution,
},
id,
&core.LaunchPlanTemplate{
Id: &core.Identifier{},
},
launchPlanWithOutputs,
nil,
"",
parentWorkflowID,
)
assert.NoError(t, err)
})
Expand Down Expand Up @@ -281,11 +309,9 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
ParentNodeExecution: parentNodeExecution,
},
id,
&core.LaunchPlanTemplate{
Id: &core.Identifier{},
},
launchPlanWithOutputs,
nil,
"",
parentWorkflowID,
)
assert.NoError(t, err)
assert.True(t, createCalled)
Expand All @@ -312,11 +338,9 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
},
},
id,
&core.LaunchPlanTemplate{
Id: &core.Identifier{},
},
launchPlanWithOutputs,
nil,
"",
parentWorkflowID,
)
assert.Error(t, err)
assert.True(t, IsAlreadyExists(err))
Expand All @@ -343,11 +367,9 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
},
},
id,
&core.LaunchPlanTemplate{
Id: &core.Identifier{},
},
launchPlanWithOutputs,
nil,
"",
parentWorkflowID,
)
assert.Error(t, err)
assert.False(t, IsAlreadyExists(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type Executor interface {
launchPlan v1alpha1.ExecutableLaunchPlan, inputs *core.LiteralMap, parentWorkflowID v1alpha1.WorkflowID) error

// GetStatus retrieves status of a LaunchPlan execution
GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error)
GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier, launchPlan v1alpha1.ExecutableLaunchPlan,
parentWorkflowID v1alpha1.WorkflowID) (*admin.ExecutionClosure, *core.LiteralMap, error)

// Kill a remote execution
Kill(ctx context.Context, executionID *core.WorkflowExecutionIdentifier, reason string) error
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func (failFastWorkflowLauncher) Launch(ctx context.Context, launchCtx LaunchCont
return errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
}

func (failFastWorkflowLauncher) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error) {
func (failFastWorkflowLauncher) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier,
launchPlan v1alpha1.ExecutableLaunchPlan, parentWorkflowID v1alpha1.WorkflowID) (*admin.ExecutionClosure, *core.LiteralMap, error) {
logger.Infof(ctx, "NOOP: Workflow Status ExecID [%s]", executionID.Name)
return nil, nil, errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func TestFailFastWorkflowLauncher(t *testing.T) {
Project: "p",
Domain: "d",
Name: "n",
})
}, &core.LaunchPlanTemplate{
Id: &core.Identifier{},
}, "",
)
assert.Nil(t, a)
assert.Error(t, err)
})
Expand Down
Loading

0 comments on commit 6d61571

Please sign in to comment.