Skip to content

Commit

Permalink
Use GetExecutionData instead (flyteorg#573)
Browse files Browse the repository at this point in the history
* Use GetExecutionData instead

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* bump

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* fix unit tests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* more fixes

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* fix more

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Regenerate

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Only call GetExecutionData when the workflow succeeds

Signed-off-by: Haytham Abuelfutuh <[email protected]>

---------

Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Jun 13, 2023
1 parent eadf07b commit 89dfd5b
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 88 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/nodes/subworkflow/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
}),
).Return(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
}, nil)
}, &core.LiteralMap{}, nil)

nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)
s, err := h.Handle(ctx, nCtx)
Expand All @@ -271,7 +271,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
}),
).Return(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
}, nil)
}, &core.LiteralMap{}, nil)

nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)
s, err := h.Handle(ctx, nCtx)
Expand Down
21 changes: 5 additions & 16 deletions pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil
}

wfStatusClosure, err := l.launchPlan.GetStatus(ctx, childID)
wfStatusClosure, outputs, err := l.launchPlan.GetStatus(ctx, childID)
if err != nil {
if launchplan.IsNotFound(err) { // NotFound
errorCode, _ := errors.GetErrorCode(err)
Expand Down Expand Up @@ -198,22 +198,11 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
// TODO do we need to massage the output to match the alias or is the alias resolution done at the downstream consumer
// nCtx.Node().GetOutputAlias()
var oInfo *handler.OutputInfo
if wfStatusClosure.GetOutputs() != nil {
if outputs != nil {
outputFile := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
if wfStatusClosure.GetOutputs().GetUri() != "" {
uri := wfStatusClosure.GetOutputs().GetUri()
store := nCtx.DataStore()
err := store.CopyRaw(ctx, storage.DataReference(uri), outputFile, storage.Options{})
if err != nil {
logger.Warnf(ctx, "remote output for launchplan execution was not found, uri [%s], err %s", uri, err.Error())
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "remote output for launchplan execution was not found, uri [%s]", uri)
}
} else {
childOutput := wfStatusClosure.GetOutputs().GetValues()
if err := nCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, childOutput); err != nil {
logger.Debugf(ctx, "failed to write data to Storage, err: %v", err.Error())
return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "failed to copy outputs for child workflow")
}
if err := nCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, outputs); err != nil {
logger.Debugf(ctx, "failed to write data to Storage, err: %v", err.Error())
return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "failed to copy outputs for child workflow")
}

oInfo = &handler.OutputInfo{OutputURI: outputFile}
Expand Down
35 changes: 31 additions & 4 deletions pkg/controller/nodes/subworkflow/launchplan/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type executionCacheItem struct {
core.WorkflowExecutionIdentifier
ExecutionClosure *admin.ExecutionClosure
SyncError error
ExecutionOutputs *core.LiteralMap
}

func (e executionCacheItem) ID() string {
Expand Down Expand Up @@ -140,19 +141,19 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo
return nil
}

func (a *adminLaunchPlanExecutor) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, error) {
func (a *adminLaunchPlanExecutor) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error) {
if executionID == nil {
return nil, fmt.Errorf("nil executionID")
return nil, nil, fmt.Errorf("nil executionID")
}

obj, err := a.cache.GetOrCreate(executionID.String(), executionCacheItem{WorkflowExecutionIdentifier: *executionID})
if err != nil {
return nil, err
return nil, nil, err
}

item := obj.(executionCacheItem)

return item.ExecutionClosure, item.SyncError
return item.ExecutionClosure, item.ExecutionOutputs, item.SyncError
}

func (a *adminLaunchPlanExecutor) GetLaunchPlan(ctx context.Context, launchPlanRef *core.Identifier) (*admin.LaunchPlan, error) {
Expand Down Expand Up @@ -246,12 +247,38 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
continue
}

var outputs *core.LiteralMap
// Retrieve potential outputs only when the workflow succeeded.
// TODO: We can optimize further by only retrieving the outputs when the workflow has output variables in the
// interface.
if res.GetClosure().GetPhase() == core.WorkflowExecution_SUCCEEDED {
execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{
Id: &exec.WorkflowExecutionIdentifier,
})

if err != nil {
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
},
Action: cache.Update,
})

continue
}

outputs = execData.GetFullOutputs()
}

// Update the cache with the retrieved status
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
ExecutionClosure: res.Closure,
ExecutionOutputs: outputs,
},
Action: cache.Update,
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/nodes/subworkflow/launchplan/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }),
).Return(result, nil)
assert.NoError(t, err)
s, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id)
assert.NoError(t, err)
assert.Equal(t, result, s)
})
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
// Allow for sync to be called
time.Sleep(time.Second)

s, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id)
assert.Error(t, err)
assert.Nil(t, s)
assert.True(t, IsNotFound(err))
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
// Allow for sync to be called
time.Sleep(time.Second)

s, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id)
assert.Error(t, err)
assert.Nil(t, s)
assert.False(t, IsNotFound(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/subworkflow/launchplan/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Executor interface {
Launch(ctx context.Context, launchCtx LaunchContext, executionID *core.WorkflowExecutionIdentifier, launchPlanRef *core.Identifier, inputs *core.LiteralMap) error

// GetStatus retrieves status of a LaunchPlan execution
GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, error)
GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error)

// Kill a remote execution
Kill(ctx context.Context, executionID *core.WorkflowExecutionIdentifier, reason string) error
Expand Down
23 changes: 16 additions & 7 deletions pkg/controller/nodes/subworkflow/launchplan/mocks/executor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 16 additions & 7 deletions pkg/controller/nodes/subworkflow/launchplan/mocks/flyte_admin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/controller/nodes/subworkflow/launchplan/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func (failFastWorkflowLauncher) Launch(ctx context.Context, launchCtx LaunchCont
return errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
}

func (failFastWorkflowLauncher) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, error) {
func (failFastWorkflowLauncher) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error) {
logger.Infof(ctx, "NOOP: Workflow Status ExecID [%s]", executionID.Name)
return nil, errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
return nil, nil, errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
}

func (failFastWorkflowLauncher) Kill(ctx context.Context, executionID *core.WorkflowExecutionIdentifier, reason string) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/subworkflow/launchplan/noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestFailFastWorkflowLauncher(t *testing.T) {
ctx := context.TODO()
f := NewFailFastLaunchPlanExecutor()
t.Run("getStatus", func(t *testing.T) {
a, err := f.GetStatus(ctx, &core.WorkflowExecutionIdentifier{
a, _, err := f.GetStatus(ctx, &core.WorkflowExecutionIdentifier{
Project: "p",
Domain: "d",
Name: "n",
Expand Down
Loading

0 comments on commit 89dfd5b

Please sign in to comment.