Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
More cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu committed Jun 21, 2022
1 parent 443785f commit ca14d63
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 51 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,5 @@ require (
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d

replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.5-0.20220616203903-4495ffbe4ba2
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v1.1.5 h1:awptNJfw2yESkdNOm1Pe9KPILAzVImkiViUFP1K7UPk=
github.com/flyteorg/flyteidl v1.1.5/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g=
github.com/flyteorg/flyteplugins v1.0.3 h1:InTz8oKGRcya7eUvTFtDC1v9eAIbu0qodzoslbf+LRI=
github.com/flyteorg/flyteplugins v1.0.3/go.mod h1:vVKWzbVl43Jr+AUrVYsU7s4O5YOd631C1bCsZ7mnzmU=
github.com/flyteorg/flyteplugins v1.0.5-0.20220616203903-4495ffbe4ba2 h1:fcUuqxtuAaYSLSbBOg9QQS7uwCFK4pooDvzCpKvjAWs=
github.com/flyteorg/flyteplugins v1.0.5-0.20220616203903-4495ffbe4ba2/go.mod h1:vVKWzbVl43Jr+AUrVYsU7s4O5YOd631C1bCsZ7mnzmU=
github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
github.com/flyteorg/flytestdlib v1.0.4 h1:pbidu/cpCY3/hxnAXwcNx0LoNiu6eQt58Dju+K+/yxA=
github.com/flyteorg/flytestdlib v1.0.4/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
Expand Down
12 changes: 6 additions & 6 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ type ExecutionTimeInfo interface {
GetLastUpdatedAt() *metav1.Time
}

// Interface for a Node p. This provides a mutable API.
// ExecutableNodeStatus interface for a Node p. This provides a mutable API.
type ExecutableNodeStatus interface {
NodeStatusGetter
MutableNodeStatus
Expand Down Expand Up @@ -349,7 +349,7 @@ type MutableTaskNodeStatus interface {
SetBarrierClockTick(tick uint32)
}

// Interface for a Child Workflow Node
// ExecutableWorkflowNode is an interface for a Child Workflow Node
type ExecutableWorkflowNode interface {
GetLaunchPlanRefID() *LaunchPlanRefID
GetSubWorkflowRef() *WorkflowID
Expand All @@ -360,7 +360,7 @@ type BaseNode interface {
GetKind() NodeKind
}

// Interface for the Executable Node
// ExecutableNode is an interface for the Executable Node
type ExecutableNode interface {
BaseNode
IsStartNode() bool
Expand All @@ -379,7 +379,7 @@ type ExecutableNode interface {
GetName() string
}

// Interface for the Workflow p. This is the mutable portion for a Workflow
// ExecutableWorkflowStatus is an interface for the Workflow p. This is the mutable portion for a Workflow
type ExecutableWorkflowStatus interface {
NodeStatusGetter
ExecutionTimeInfo
Expand All @@ -405,7 +405,7 @@ type BaseWorkflow interface {
NodeGetter
StartNode() ExecutableNode
GetID() WorkflowID
// From returns all nodes that can be reached directly
// FromNode returns all nodes that can be reached directly
// from the node with the given unique name.
FromNode(name NodeID) ([]NodeID, error)
ToNode(name NodeID) ([]NodeID, error)
Expand All @@ -416,7 +416,7 @@ type BaseWorkflowWithStatus interface {
NodeStatusGetter
}

// This interface captures the methods available on any workflow (top level or child). The Meta section is available
// ExecutableSubWorkflow interface captures the methods available on any workflow (top level or child). The Meta section is available
// only for the top level workflow
type ExecutableSubWorkflow interface {
BaseWorkflow
Expand Down
8 changes: 2 additions & 6 deletions pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,11 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node
}

if downstreamStatus.IsComplete() {
deckURI := v1alpha1.GetDeckFile(childNodeStatus.GetOutputDir())
metadata, err := nCtx.DataStore().Head(context.Background(), deckURI)
if err != nil || !metadata.Exists() {
deckURI = ""
}
// For branch node we set the output node to be the same as the child nodes output
phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{
OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir()), DeckURI: deckURI},
OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir())},
})

return handler.DoTransition(handler.TransitionTypeEphemeral, phase), nil
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,15 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context,
}

destinationPath := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
deckPath := v1alpha1.GetDeckFile(nCtx.NodeStatus().GetOutputDir())
metadata, err := nCtx.DataStore().Head(context.Background(), deckPath)
if err != nil || !metadata.Exists() {
deckPath = ""
}
if err := nCtx.DataStore().CopyRaw(ctx, sourcePath, destinationPath, storage.Options{}); err != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral,
handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "OutputsNotFound",
fmt.Sprintf("Failed to copy subworkflow outputs from [%v] to [%v]. Error: %s", sourcePath, destinationPath, err.Error()), nil),
), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Failed to copy subworkflow outputs"},
nil
}
o = &handler.OutputInfo{OutputURI: destinationPath, DeckURI: deckPath}

o = &handler.OutputInfo{OutputURI: destinationPath}
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{
Expand Down
23 changes: 16 additions & 7 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,32 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe
} else {
logger.Debugf(ctx, "No outputs found for recovered node [%+v]", nCtx.NodeExecutionMetadata().GetNodeExecutionID())
}

outputFile := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
oi := &handler.OutputInfo{
OutputURI: outputFile,
}

deckFile := v1alpha1.GetDeckFile(nCtx.NodeStatus().GetOutputDir())
metadata, err := nCtx.DataStore().Head(context.Background(), deckFile)
if err != nil || !metadata.Exists() {
deckFile = ""
metadata, err := nCtx.DataStore().Head(ctx, deckFile)
if err != nil {
logger.Errorf(ctx, "Failed to check the existence of deck file. Error: %v", err)
return handler.PhaseInfoUndefined, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to check the existence of deck file.")
}

if metadata.Exists() {
oi.DeckURI = &deckFile
}

if err := c.store.WriteProtobuf(ctx, outputFile, so, outputs); err != nil {
logger.Errorf(ctx, "Failed to write protobuf (metadata). Error [%v]", err)
return handler.PhaseInfoUndefined, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to store recovered node execution outputs")
}

info := &handler.ExecutionInfo{
OutputInfo: &handler.OutputInfo{
OutputURI: outputFile,
DeckURI: deckFile,
},
OutputInfo: oi,
}

if recovered.Closure.GetTaskNodeMetadata() != nil {
taskNodeInfo := &handler.TaskNodeInfo{
TaskNodeMetadata: &event.TaskNodeMetadata{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/handler/transition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type TaskNodeInfo struct {

type OutputInfo struct {
OutputURI storage.DataReference
DeckURI storage.DataReference
DeckURI *storage.DataReference
}

type ExecutionInfo struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/nodes/handler/transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/stretchr/testify/assert"
)

func AsPointer[T any](val T) *T {
return &val
}

func TestDoTransition(t *testing.T) {
t.Run("ephemeral", func(t *testing.T) {
tr := DoTransition(TransitionTypeEphemeral, PhaseInfoQueued("queued"))
Expand All @@ -16,7 +20,7 @@ func TestDoTransition(t *testing.T) {

t.Run("barrier", func(t *testing.T) {
tr := DoTransition(TransitionTypeBarrier, PhaseInfoSuccess(&ExecutionInfo{
OutputInfo: &OutputInfo{OutputURI: "uri", DeckURI: "deck"},
OutputInfo: &OutputInfo{OutputURI: "uri", DeckURI: AsPointer(storage.DataReference("deck"))},
}))
assert.Equal(t, TransitionTypeBarrier, tr.Type())
assert.Equal(t, EPhaseSuccess, tr.Info().p)
Expand Down
9 changes: 3 additions & 6 deletions pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
var oInfo *handler.OutputInfo
if wfStatusClosure.GetOutputs() != nil {
outputFile := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
deckFile := v1alpha1.GetDeckFile(nCtx.NodeStatus().GetOutputDir())
metadata, err := nCtx.DataStore().Head(context.Background(), deckFile)
if err != nil || !metadata.Exists() {
deckFile = ""
}
if wfStatusClosure.GetOutputs().GetUri() != "" {
uri := wfStatusClosure.GetOutputs().GetUri()
store := nCtx.DataStore()
Expand All @@ -190,8 +185,10 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "failed to copy outputs for child workflow")
}
}
oInfo = &handler.OutputInfo{OutputURI: outputFile, DeckURI: deckFile}

oInfo = &handler.OutputInfo{OutputURI: outputFile}
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{
WorkflowNodeInfo: &handler.WorkflowNodeInfo{LaunchedWorkflowID: childID},
OutputInfo: oInfo,
Expand Down
8 changes: 2 additions & 6 deletions pkg/controller/nodes/subworkflow/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,12 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler

// TODO optimization, we could just point the outputInfo to the path of the subworkflows output
destinationPath := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
deckPath := v1alpha1.GetDeckFile(nCtx.NodeStatus().GetOutputDir())
metadata, err := nCtx.DataStore().Head(context.Background(), deckPath)
if err != nil || !metadata.Exists() {
deckPath = ""
}

if err := store.CopyRaw(ctx, sourcePath, destinationPath, storage.Options{}); err != nil {
errMsg := fmt.Sprintf("Failed to copy subworkflow outputs from [%v] to [%v]", sourcePath, destinationPath)
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.SubWorkflowExecutionFailed, errMsg, nil)), nil
}
oInfo = &handler.OutputInfo{OutputURI: destinationPath, DeckURI: deckPath}
oInfo = &handler.OutputInfo{OutputURI: destinationPath}
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{
Expand Down
42 changes: 37 additions & 5 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func getPluginMetricKey(pluginID, taskType string) string {
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath, deckPath storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
Expand Down Expand Up @@ -141,8 +141,12 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath, deckPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{OutputURI: outputPath, DeckURI: deckPath}
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
TaskNodeMetadata: taskMetadata,
}
Expand Down Expand Up @@ -479,7 +483,21 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
if ee != nil {
pluginTrns.ObservedExecutionError(ee)
} else {
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), tCtx.ow.GetDeckPath(), &event.TaskNodeMetadata{CacheStatus: cacheStatus.GetCacheStatus(), CatalogKey: cacheStatus.GetMetadata()})
var deckUri *storage.DataReference
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckUriValue := tCtx.ow.GetDeckPath()
deckUri = &deckUriValue
}

pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckUri,
&event.TaskNodeMetadata{
CacheStatus: cacheStatus.GetCacheStatus(),
CatalogKey: cacheStatus.GetMetadata(),
})
}
}

Expand Down Expand Up @@ -522,27 +540,41 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
logger.Errorf(ctx, "failed to check catalog cache with error")
return handler.UnknownTransition, err
}

if entry.GetStatus().GetCacheStatus() == core.CatalogCacheStatus_CACHE_HIT {
r := tCtx.ow.GetReader()
if r == nil {
return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "failed to reader outputs from a CacheHIT. Unexpected!")
}

// TODO @kumare this can be optimized, if we have paths then the reader could be pipelined to a sink
o, ee, err := r.Read(ctx)
if err != nil {
logger.Errorf(ctx, "failed to read from catalog, err: %s", err.Error())
return handler.UnknownTransition, err
}

if ee != nil {
logger.Errorf(ctx, "got execution error from catalog output reader? This should not happen, err: %s", ee.String())
return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "execution error from a cache output, bad state: %s", ee.String())
}

if err := nCtx.DataStore().WriteProtobuf(ctx, tCtx.ow.GetOutputPath(), storage.Options{}, o); err != nil {
logger.Errorf(ctx, "failed to write cached value to datastore, err: %s", err.Error())
return handler.UnknownTransition, err
}

pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), *r.GetDeckPath(), entry)
var deckUri *storage.DataReference
exists, err := r.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return handler.UnknownTransition, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckUriValue := tCtx.ow.GetDeckPath()
deckUri = &deckUriValue
}

pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), deckUri, entry)
} else {
logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String())
pluginTrns.PopulateCacheInfo(entry)
Expand Down
6 changes: 2 additions & 4 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,7 @@ func Test_task_Handle_Catalog(t *testing.T) {
c := &pluginCatalogMocks.Client{}
if tt.args.catalogFetch {
or := &ioMocks.OutputReader{}
deckPath := storage.DataReference(deckPath)
or.OnGetDeckPath().Return(&deckPath)
or.OnDeckExistsMatch(mock.Anything).Return(true, nil)
or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil)
c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil)
} else {
Expand Down Expand Up @@ -1125,8 +1124,7 @@ func Test_task_Handle_Reservation(t *testing.T) {
nCtx.OnNodeStateReader().Return(nr)
if tt.args.catalogFetch {
or := &ioMocks.OutputReader{}
deckPath := storage.DataReference("deck.html")
or.OnGetDeckPath().Return(&deckPath)
or.OnDeckExistsMatch(mock.Anything).Return(true, nil)
or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil)
c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil)
} else {
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
}
}
if eInfo != nil && eInfo.OutputInfo != nil {
nev.DeckUri = eInfo.OutputInfo.DeckURI.String()
if eInfo.OutputInfo.DeckURI != nil {
nev.DeckUri = eInfo.OutputInfo.DeckURI.String()
}

nev.OutputResult = ToNodeExecOutput(eInfo.OutputInfo)
} else if info.GetErr() != nil {
nev.OutputResult = &event.NodeExecutionEvent_Error{
Expand Down

0 comments on commit ca14d63

Please sign in to comment.