Skip to content

Commit

Permalink
Use GetExecutionData instead (flyteorg#573)
Browse files Browse the repository at this point in the history
* Use GetExecutionData instead

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* bump

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* fix unit tests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* more fixes

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* fix more

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Regenerate

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Only call GetExecutionData when the workflow succeeds

Signed-off-by: Haytham Abuelfutuh <[email protected]>

---------

Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Jun 13, 2023
1 parent 977aaab commit 10efce7
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
}),
).Return(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
}, nil)
}, &core.LiteralMap{}, nil)

nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)
s, err := h.Handle(ctx, nCtx)
Expand All @@ -271,7 +271,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) {
}),
).Return(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
}, nil)
}, &core.LiteralMap{}, nil)

nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus)
s, err := h.Handle(ctx, nCtx)
Expand Down
21 changes: 5 additions & 16 deletions flytepropeller/pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil
}

wfStatusClosure, err := l.launchPlan.GetStatus(ctx, childID)
wfStatusClosure, outputs, err := l.launchPlan.GetStatus(ctx, childID)
if err != nil {
if launchplan.IsNotFound(err) { // NotFound
errorCode, _ := errors.GetErrorCode(err)
Expand Down Expand Up @@ -198,22 +198,11 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
// TODO do we need to massage the output to match the alias or is the alias resolution done at the downstream consumer
// nCtx.Node().GetOutputAlias()
var oInfo *handler.OutputInfo
if wfStatusClosure.GetOutputs() != nil {
if outputs != nil {
outputFile := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
if wfStatusClosure.GetOutputs().GetUri() != "" {
uri := wfStatusClosure.GetOutputs().GetUri()
store := nCtx.DataStore()
err := store.CopyRaw(ctx, storage.DataReference(uri), outputFile, storage.Options{})
if err != nil {
logger.Warnf(ctx, "remote output for launchplan execution was not found, uri [%s], err %s", uri, err.Error())
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "remote output for launchplan execution was not found, uri [%s]", uri)
}
} else {
childOutput := wfStatusClosure.GetOutputs().GetValues()
if err := nCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, childOutput); err != nil {
logger.Debugf(ctx, "failed to write data to Storage, err: %v", err.Error())
return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "failed to copy outputs for child workflow")
}
if err := nCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, outputs); err != nil {
logger.Debugf(ctx, "failed to write data to Storage, err: %v", err.Error())
return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "failed to copy outputs for child workflow")
}

oInfo = &handler.OutputInfo{OutputURI: outputFile}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type executionCacheItem struct {
core.WorkflowExecutionIdentifier
ExecutionClosure *admin.ExecutionClosure
SyncError error
ExecutionOutputs *core.LiteralMap
}

func (e executionCacheItem) ID() string {
Expand Down Expand Up @@ -140,19 +141,19 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo
return nil
}

func (a *adminLaunchPlanExecutor) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, error) {
func (a *adminLaunchPlanExecutor) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error) {
if executionID == nil {
return nil, fmt.Errorf("nil executionID")
return nil, nil, fmt.Errorf("nil executionID")
}

obj, err := a.cache.GetOrCreate(executionID.String(), executionCacheItem{WorkflowExecutionIdentifier: *executionID})
if err != nil {
return nil, err
return nil, nil, err
}

item := obj.(executionCacheItem)

return item.ExecutionClosure, item.SyncError
return item.ExecutionClosure, item.ExecutionOutputs, item.SyncError
}

func (a *adminLaunchPlanExecutor) GetLaunchPlan(ctx context.Context, launchPlanRef *core.Identifier) (*admin.LaunchPlan, error) {
Expand Down Expand Up @@ -246,12 +247,38 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
continue
}

var outputs *core.LiteralMap
// Retrieve potential outputs only when the workflow succeeded.
// TODO: We can optimize further by only retrieving the outputs when the workflow has output variables in the
// interface.
if res.GetClosure().GetPhase() == core.WorkflowExecution_SUCCEEDED {
execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{
Id: &exec.WorkflowExecutionIdentifier,
})

if err != nil {
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
},
Action: cache.Update,
})

continue
}

outputs = execData.GetFullOutputs()
}

// Update the cache with the retrieved status
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
ExecutionClosure: res.Closure,
ExecutionOutputs: outputs,
},
Action: cache.Update,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }),
).Return(result, nil)
assert.NoError(t, err)
s, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id)
assert.NoError(t, err)
assert.Equal(t, result, s)
})
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
// Allow for sync to be called
time.Sleep(time.Second)

s, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id)
assert.Error(t, err)
assert.Nil(t, s)
assert.True(t, IsNotFound(err))
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
// Allow for sync to be called
time.Sleep(time.Second)

s, err := exec.GetStatus(ctx, id)
s, _, err := exec.GetStatus(ctx, id)
assert.Error(t, err)
assert.Nil(t, s)
assert.False(t, IsNotFound(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Executor interface {
Launch(ctx context.Context, launchCtx LaunchContext, executionID *core.WorkflowExecutionIdentifier, launchPlanRef *core.Identifier, inputs *core.LiteralMap) error

// GetStatus retrieves status of a LaunchPlan execution
GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, error)
GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error)

// Kill a remote execution
Kill(ctx context.Context, executionID *core.WorkflowExecutionIdentifier, reason string) error
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func (failFastWorkflowLauncher) Launch(ctx context.Context, launchCtx LaunchCont
return errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
}

func (failFastWorkflowLauncher) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, error) {
func (failFastWorkflowLauncher) GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error) {
logger.Infof(ctx, "NOOP: Workflow Status ExecID [%s]", executionID.Name)
return nil, errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
return nil, nil, errors.Wrapf(RemoteErrorUser, fmt.Errorf("badly configured system"), "please enable admin workflow launch to use launchplans")
}

func (failFastWorkflowLauncher) Kill(ctx context.Context, executionID *core.WorkflowExecutionIdentifier, reason string) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestFailFastWorkflowLauncher(t *testing.T) {
ctx := context.TODO()
f := NewFailFastLaunchPlanExecutor()
t.Run("getStatus", func(t *testing.T) {
a, err := f.GetStatus(ctx, &core.WorkflowExecutionIdentifier{
a, _, err := f.GetStatus(ctx, &core.WorkflowExecutionIdentifier{
Project: "p",
Domain: "d",
Name: "n",
Expand Down
Loading

0 comments on commit 10efce7

Please sign in to comment.