diff --git a/cmd/kubectl-flyte/cmd/printers/node.go b/cmd/kubectl-flyte/cmd/printers/node.go index 9bf86419b0..d1986075b3 100644 --- a/cmd/kubectl-flyte/cmd/printers/node.go +++ b/cmd/kubectl-flyte/cmd/printers/node.go @@ -23,7 +23,7 @@ func ColorizeNodePhase(p v1alpha1.NodePhase) string { switch p { case v1alpha1.NodePhaseNotYetStarted: return p.String() - case v1alpha1.NodePhaseRunning: + case v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseDynamicRunning: return color.YellowString("%s", p.String()) case v1alpha1.NodePhaseSucceeded: return color.HiGreenString("%s", p.String()) diff --git a/go.mod b/go.mod index c3349789e8..00b08e8d41 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 - github.com/flyteorg/flyteidl v0.18.31 + github.com/flyteorg/flyteidl v0.18.39 github.com/flyteorg/flyteplugins v0.5.42 - github.com/flyteorg/flytestdlib v0.3.16 + github.com/flyteorg/flytestdlib v0.3.17 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible github.com/go-test/deep v1.0.7 diff --git a/go.sum b/go.sum index 36831c66e7..78df63fd59 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,7 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA= @@ -231,14 +232,18 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteidl v0.18.31 h1:aEu9HRT2GsiVKJsHYBEUA+zkPgCWFSokE0/kixYQiFY= -github.com/flyteorg/flyteidl v0.18.31/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= +github.com/flyteorg/flyteidl v0.18.33 h1:CJFhjBoAA9Dmv8PdE4geYrupwb8mxzmHDHbCjneBiLw= +github.com/flyteorg/flyteidl v0.18.33/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= +github.com/flyteorg/flyteidl v0.18.36-0.20210417001501-910f8f60f11b h1:HqCfdGY2keiyyIqOfF+QzdUt7pgHJLxw4GxTBAPvQe4= +github.com/flyteorg/flyteidl v0.18.36-0.20210417001501-910f8f60f11b/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= +github.com/flyteorg/flyteidl v0.18.39 h1:AxsG5fyx7TMWRmI8Bb96nVsdC3njjrqVSlO/UmOIL7k= +github.com/flyteorg/flyteidl v0.18.39/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= github.com/flyteorg/flyteplugins v0.5.42 h1:G4DRR2r8LlmkV+orXloDi1ly+M5WuvAaNlWFgGGyy3A= github.com/flyteorg/flyteplugins v0.5.42/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= -github.com/flyteorg/flytestdlib v0.3.16 h1:wO/tk0KfLIpLS89GbJWG4nNADWnOse4nQbTDN/mEvzM= -github.com/flyteorg/flytestdlib v0.3.16/go.mod h1:VlbQuHTE+z2N5qusfwi+6WEkeJoqr8Q0E4NtBAsdwkU= +github.com/flyteorg/flytestdlib v0.3.17 h1:7OexDLAjTBzJNGMmKKFmUTkss0I9IFo1LdTMpvH4qqA= +github.com/flyteorg/flytestdlib v0.3.17/go.mod h1:VlbQuHTE+z2N5qusfwi+6WEkeJoqr8Q0E4NtBAsdwkU= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= @@ -1231,6 +1236,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78= k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78= diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index dea6a57bca..f54c502641 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -63,6 +63,7 @@ const ( NodePhaseRetryableFailure NodePhaseTimingOut NodePhaseTimedOut + NodePhaseDynamicRunning ) func (p NodePhase) String() string { @@ -89,6 +90,8 @@ func (p NodePhase) String() string { return "Skipped" case NodePhaseRetryableFailure: return "RetryableFailure" + case NodePhaseDynamicRunning: + return "DynamicRunning" } return "Unknown" diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index f7b8c79eef..aaed1357a7 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -89,6 +89,8 @@ const ( DynamicNodePhaseFailing // This Phase implies that the Parent node is done but it needs to be finalized before progressing to the sub-nodes (or dynamically yielded nodes) DynamicNodePhaseParentFinalizing + // This Phase implies that the Parent node has finalized and the sub-node (or dynamically yielded nodes) can now be processed. + DynamicNodePhaseParentFinalized ) type DynamicNodeStatus struct { diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 9e7d580d1e..06148b4716 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -85,6 +85,7 @@ type EventVersion int const ( EventVersion0 EventVersion = iota EventVersion1 + EventVersion2 ) type NodeDefaults struct { diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index 3d0e2dc078..36b368cf90 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -5,14 +5,11 @@ import ( "fmt" "strconv" + "k8s.io/apimachinery/pkg/util/rand" + node_common "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/storage" - "k8s.io/apimachinery/pkg/util/rand" - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flytepropeller/pkg/compiler" "github.com/flyteorg/flytepropeller/pkg/compiler/common" @@ -22,15 +19,21 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task" "github.com/flyteorg/flytepropeller/pkg/utils" + "github.com/flyteorg/flytestdlib/errors" + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/storage" ) type dynamicWorkflowContext struct { - execContext executors.ExecutionContext - subWorkflow v1alpha1.ExecutableWorkflow - nodeLookup executors.NodeLookup - isDynamic bool + execContext executors.ExecutionContext + subWorkflow v1alpha1.ExecutableWorkflow + subWorkflowClosure *core.CompiledWorkflowClosure + nodeLookup executors.NodeLookup + isDynamic bool } +const dynamicWfNameTemplate = "dynamic_%s" + func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Context, djSpec *core.DynamicJobSpec, nCtx handler.NodeExecutionContext, parentNodeStatus v1alpha1.ExecutableNodeStatus) (*core.WorkflowTemplate, error) { @@ -106,8 +109,8 @@ func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Con Id: &core.Identifier{ Project: nCtx.NodeExecutionMetadata().GetNodeExecutionID().GetExecutionId().Project, Domain: nCtx.NodeExecutionMetadata().GetNodeExecutionID().GetExecutionId().Domain, + Name: fmt.Sprintf(dynamicWfNameTemplate, nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId), Version: rand.String(10), - Name: rand.String(10), ResourceType: core.ResourceType_WORKFLOW, }, Nodes: djSpec.Nodes, @@ -132,23 +135,35 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C dynamicNodeStatus.SetOutputDir(nCtx.NodeStatus().GetOutputDir()) dynamicNodeStatus.SetParentTaskID(execID) - // cacheHitStopWatch := d.metrics.CacheHit.Start(ctx) + cacheHitStopWatch := d.metrics.CacheHit.Start(ctx) // Check if we have compiled the workflow before: // If there is a cached compiled Workflow, load and return it. - // if ok, err := f.CacheExists(ctx); err != nil { - // logger.Warnf(ctx, "Failed to call head on compiled futures file. Error: %v", err) - // return nil, false, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to do HEAD on compiled futures file.") - // } else if ok { - // // It exists, load and return it - // compiledWf, err := f.RetrieveCache(ctx) - // if err != nil { - // logger.Warnf(ctx, "Failed to load cached flyte workflow , this will cause the dynamic workflow to be recompiled. Error: %v", err) - // d.metrics.CacheError.Inc(ctx) - // } else { - // cacheHitStopWatch.Stop() - // return newContextualWorkflow(nCtx.Workflow(), compiledWf, dynamicNodeStatus, compiledWf.Tasks, compiledWf.SubWorkflows), true, nil - // } - // } + if ok, err := f.CacheExists(ctx); err != nil { + logger.Warnf(ctx, "Failed to call head on compiled workflow files. Error: %v", err) + return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "Failed to do HEAD on compiled workflow files.") + } else if ok { + // It exists, load and return it + workflowCacheContents, err := f.RetrieveCache(ctx) + if err != nil { + logger.Warnf(ctx, "Failed to load cached flyte workflow, this will cause the dynamic workflow to be recompiled. Error: %v", err) + d.metrics.CacheError.Inc(ctx) + } else { + cacheHitStopWatch.Stop() + newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + if err != nil { + return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID") + } + compiledWf := workflowCacheContents.WorkflowCRD + return dynamicWorkflowContext{ + isDynamic: true, + subWorkflow: compiledWf, + subWorkflowClosure: workflowCacheContents.CompiledWorkflow, + execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), compiledWf, compiledWf, newParentInfo, nCtx.ExecutionContext()), + nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus), + }, nil + } + } + d.metrics.CacheMiss.Inc(ctx) // We know for sure that futures file was generated. Lets read it djSpec, err := f.Read(ctx) @@ -156,7 +171,6 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "unable to read futures file, maybe corrupted") } - var closure *core.CompiledWorkflowClosure wf, err := d.buildDynamicWorkflowTemplate(ctx, djSpec, nCtx, dynamicNodeStatus) if err != nil { return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template") @@ -184,6 +198,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C // The reason they might be missing is because if a user yields a task that is SdkTask.fetch'ed, it should not be included // See https://github.com/flyteorg/flyte/issues/219 for more information. + var closure *core.CompiledWorkflowClosure closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, launchPlanInterfaces) if err != nil { return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow") @@ -194,7 +209,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow") } - if err := f.Cache(ctx, dynamicWf); err != nil { + if err := f.Cache(ctx, dynamicWf, closure); err != nil { logger.Errorf(ctx, "Failed to cache Dynamic workflow [%s]", err.Error()) } @@ -205,10 +220,11 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID") } return dynamicWorkflowContext{ - isDynamic: true, - subWorkflow: dynamicWf, - execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()), - nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus), + isDynamic: true, + subWorkflow: dynamicWf, + subWorkflowClosure: closure, + execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()), + nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus), }, nil } @@ -228,7 +244,7 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, } // As we do not support Failure Node, we can just return failure in this case - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(nil)), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Dynamic workflow failed", Error: state.Err}, nil } @@ -278,7 +294,7 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, } } - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), prevState, nil + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(nil)), prevState, nil } func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, launchPlanIDs []compiler.LaunchPlanRefIdentifier) ( diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 708cdede2c..6f0eaac01b 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -1,9 +1,13 @@ package dynamic import ( + "bytes" "context" + "encoding/json" "testing" + "github.com/pkg/errors" + "github.com/flyteorg/flytepropeller/pkg/utils" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -11,6 +15,7 @@ import ( mocks3 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/storage" + storageMocks "github.com/flyteorg/flytestdlib/storage/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,7 +31,7 @@ import ( ) func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *testing.T) { - createNodeContext := func(ttype string, finalOutput storage.DataReference) *mocks.NodeExecutionContext { + createNodeContext := func(ttype string, finalOutput storage.DataReference, dataStore *storage.DataStore) *mocks.NodeExecutionContext { ctx := context.TODO() wfExecID := &core.WorkflowExecutionIdentifier{ @@ -78,8 +83,11 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t tID := "dyn-task-1" n.OnGetTaskID().Return(&tID) - dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) - assert.NoError(t, err) + if dataStore == nil { + var err error + dataStore, err = storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + } ir := &mocks3.InputReader{} nCtx := &mocks.NodeExecutionContext{} @@ -143,7 +151,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t } djSpec := createDynamicJobSpecWithLaunchPlans() finalOutput := storage.DataReference("/subnode") - nCtx := createNodeContext("test", finalOutput) + nCtx := createNodeContext("test", finalOutput, nil) s := &dynamicNodeStateHolder{} nCtx.On("NodeStateWriter").Return(s) f, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures.pb") @@ -195,6 +203,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.True(t, callsAdmin) assert.True(t, dCtx.isDynamic) assert.NotNil(t, dCtx.subWorkflow) + assert.NotNil(t, dCtx.subWorkflowClosure) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) expectedParentUniqueID, err := utils.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") @@ -214,7 +223,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t } djSpec := createDynamicJobSpecWithLaunchPlans() finalOutput := storage.DataReference("/subnode") - nCtx := createNodeContext("test", finalOutput) + nCtx := createNodeContext("test", finalOutput, nil) s := &dynamicNodeStateHolder{} nCtx.On("NodeStateWriter").Return(s) f, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures.pb") @@ -263,6 +272,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.True(t, callsAdmin) assert.True(t, dCtx.isDynamic) assert.NotNil(t, dCtx.subWorkflow) + assert.NotNil(t, dCtx.subWorkflowClosure) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) expectedParentUniqueID, err := utils.FixedLengthUniqueIDForParts(20, "", "", "n1") @@ -282,7 +292,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t } djSpec := createDynamicJobSpecWithLaunchPlans() finalOutput := storage.DataReference("/subnode") - nCtx := createNodeContext("test", finalOutput) + nCtx := createNodeContext("test", finalOutput, nil) s := &dynamicNodeStateHolder{} nCtx.OnNodeStateWriter().Return(s) f, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures.pb") @@ -329,4 +339,250 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.Error(t, err) assert.True(t, callsAdmin) }) + t.Run("dynamic wf cached", func(t *testing.T) { + ctx := context.Background() + lpID := &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Name: "my_plan", + Project: "p", + Domain: "d", + } + djSpec := createDynamicJobSpecWithLaunchPlans() + finalOutput := storage.DataReference("/subnode") + nCtx := createNodeContext("test", finalOutput, nil) + + s := &dynamicNodeStateHolder{} + nCtx.On("NodeStateWriter").Return(s) + + // Create a k8s Flyte workflow and store that in the cache + dynamicWf := &v1alpha1.FlyteWorkflow{ + ServiceAccountName: "sa", + } + + rawDynamicWf, err := json.Marshal(dynamicWf) + assert.NoError(t, err) + _, err = nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures_compiled.pb") + assert.NoError(t, err) + assert.NoError(t, nCtx.DataStore().WriteRaw(context.TODO(), storage.DataReference("/output-dir/futures_compiled.pb"), int64(len(rawDynamicWf)), storage.Options{}, bytes.NewReader(rawDynamicWf))) + + f, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures.pb") + assert.NoError(t, err) + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, djSpec)) + + f, err = nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "dynamic_compiled.pb") + assert.NoError(t, err) + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_WORKFLOW, + }, + }, + }, + })) + + mockLPLauncher := &mocks5.Reader{} + var callsAdmin = false + mockLPLauncher.OnGetLaunchPlanMatch(ctx, lpID).Run(func(args mock.Arguments) { + // When a launch plan node is detected, a call should be made to Admin to fetch the interface for the LP. + // However in the cached case no such call should be necessary. + callsAdmin = true + }).Return(&admin.LaunchPlan{ + Id: lpID, + Closure: &admin.LaunchPlanClosure{ + ExpectedInputs: &core.ParameterMap{}, + ExpectedOutputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "x": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + Description: "output of the launch plan", + }, + }, + }, + }, + }, nil) + h := &mocks6.TaskNodeHandler{} + n := &mocks4.Node{} + d := dynamicNodeTaskNodeHandler{ + TaskNodeHandler: h, + nodeExecutor: n, + lpReader: mockLPLauncher, + metrics: newMetrics(promutils.NewTestScope()), + } + + execContext := &mocks4.ExecutionContext{} + immutableParentInfo := mocks4.ImmutableParentInfo{} + immutableParentInfo.OnGetUniqueID().Return("c1") + immutableParentInfo.OnCurrentAttempt().Return(uint32(2)) + execContext.OnGetParentInfo().Return(&immutableParentInfo) + execContext.OnGetEventVersion().Return(v1alpha1.EventVersion1) + nCtx.OnExecutionContext().Return(execContext) + + dCtx, err := d.buildContextualDynamicWorkflow(ctx, nCtx) + assert.NoError(t, err) + assert.False(t, callsAdmin) + assert.True(t, dCtx.isDynamic) + assert.NotNil(t, dCtx.subWorkflow) + assert.NotNil(t, dCtx.execContext) + assert.NotNil(t, dCtx.execContext.GetParentInfo()) + expectedParentUniqueID, err := utils.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + assert.Nil(t, err) + assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) + assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) + assert.NotNil(t, dCtx.nodeLookup) + }) + + t.Run("dynamic wf cache read fails", func(t *testing.T) { + ctx := context.Background() + finalOutput := storage.DataReference("/subnode") + + composedPBStore := storageMocks.ComposedProtobufStore{} + composedPBStore.On("Head", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb")). + Return(nil, errors.New("foo")) + referenceConstructor := storageMocks.ReferenceConstructor{} + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("output-dir"), "futures.pb").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/futures.pb"), nil) + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("output-dir"), "futures_compiled.pb").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), nil) + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("output-dir"), "dynamic_compiled.pb").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/dynamic_compiled.pb"), nil) + dataStore := &storage.DataStore{ + ComposedProtobufStore: &composedPBStore, + ReferenceConstructor: &referenceConstructor, + } + + nCtx := createNodeContext("test", finalOutput, dataStore) + nCtx.OnCurrentAttempt().Return(uint32(1)) + mockLPLauncher := &mocks5.Reader{} + + h := &mocks6.TaskNodeHandler{} + n := &mocks4.Node{} + d := dynamicNodeTaskNodeHandler{ + TaskNodeHandler: h, + nodeExecutor: n, + lpReader: mockLPLauncher, + metrics: newMetrics(promutils.NewTestScope()), + } + + execContext := &mocks4.ExecutionContext{} + immutableParentInfo := mocks4.ImmutableParentInfo{} + immutableParentInfo.OnGetUniqueID().Return("c1") + immutableParentInfo.OnCurrentAttempt().Return(uint32(2)) + execContext.OnGetParentInfo().Return(&immutableParentInfo) + execContext.OnGetEventVersion().Return(v1alpha1.EventVersion1) + nCtx.OnExecutionContext().Return(execContext) + + _, err := d.buildContextualDynamicWorkflow(ctx, nCtx) + assert.EqualError(t, err, "[system] Failed to do HEAD on compiled workflow files., caused by: Failed to do HEAD on futures file.: foo") + }) + t.Run("dynamic wf cache write fails", func(t *testing.T) { + ctx := context.Background() + finalOutput := storage.DataReference("/subnode") + + metadata := existsMetadata{} + composedPBStore := storageMocks.ComposedProtobufStore{} + composedPBStore.On("Head", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb")). + Return(&metadata, nil) + + djSpec := createDynamicJobSpecWithLaunchPlans() + composedPBStore.On("ReadProtobuf", mock.MatchedBy(func(ctx context.Context) bool { return true }), + storage.DataReference("s3://my-s3-bucket/foo/bar/futures.pb"), &core.DynamicJobSpec{}).Return(nil).Run(func(args mock.Arguments) { + djSpecPtr := args.Get(2).(*core.DynamicJobSpec) + *djSpecPtr = *djSpec + }) + composedPBStore.On("WriteRaw", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), + int64(1039), + storage.Options{}, + mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo")) + + referenceConstructor := storageMocks.ReferenceConstructor{} + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("output-dir"), "futures.pb").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/futures.pb"), nil) + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("output-dir"), "dynamic_compiled.pb").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/dynamic_compiled.pb"), nil) + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("output-dir"), "Node_1").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/Node_1"), nil) + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/Node_1"), "0").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/Node_1/0"), nil) + referenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("output-dir"), "futures_compiled.pb").Return( + storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), nil) + dataStore := &storage.DataStore{ + ComposedProtobufStore: &composedPBStore, + ReferenceConstructor: &referenceConstructor, + } + + nCtx := createNodeContext("test", finalOutput, dataStore) + nCtx.OnCurrentAttempt().Return(uint32(1)) + + lpID := &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Name: "my_plan", + Project: "p", + Domain: "d", + } + mockLPLauncher := &mocks5.Reader{} + mockLPLauncher.OnGetLaunchPlanMatch(ctx, lpID).Return(&admin.LaunchPlan{ + Id: lpID, + Closure: &admin.LaunchPlanClosure{ + ExpectedInputs: &core.ParameterMap{}, + ExpectedOutputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "x": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + Description: "output of the launch plan", + }, + }, + }, + }, + }, nil) + + h := &mocks6.TaskNodeHandler{} + n := &mocks4.Node{} + d := dynamicNodeTaskNodeHandler{ + TaskNodeHandler: h, + nodeExecutor: n, + lpReader: mockLPLauncher, + metrics: newMetrics(promutils.NewTestScope()), + } + + execContext := &mocks4.ExecutionContext{} + immutableParentInfo := mocks4.ImmutableParentInfo{} + immutableParentInfo.OnGetUniqueID().Return("c1") + immutableParentInfo.OnCurrentAttempt().Return(uint32(2)) + execContext.OnGetParentInfo().Return(&immutableParentInfo) + execContext.OnGetEventVersion().Return(v1alpha1.EventVersion1) + nCtx.OnExecutionContext().Return(execContext) + + dCtx, err := d.buildContextualDynamicWorkflow(ctx, nCtx) + assert.NoError(t, err) + assert.True(t, dCtx.isDynamic) + assert.NotNil(t, dCtx.subWorkflow) + assert.NotNil(t, dCtx.execContext) + assert.NotNil(t, dCtx.execContext.GetParentInfo()) + expectedParentUniqueID, err := utils.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + assert.Nil(t, err) + assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) + assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) + assert.NotNil(t, dCtx.nodeLookup) + }) +} + +type existsMetadata struct{} + +func (e existsMetadata) Exists() bool { + return false +} + +func (e existsMetadata) Size() int64 { + return int64(1) } diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 3299df1fbb..4a7261e8e6 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -42,6 +42,7 @@ type metrics struct { retrieveDynamicJobSpec labeled.StopWatch CacheHit labeled.StopWatch CacheError labeled.Counter + CacheMiss labeled.Counter } func newMetrics(scope promutils.Scope) metrics { @@ -50,6 +51,7 @@ func newMetrics(scope promutils.Scope) metrics { retrieveDynamicJobSpec: labeled.NewStopWatch("retrieve_dynamic_spec", "Overhead of downloading and un-marshaling dynamic job spec", time.Microsecond, scope), CacheHit: labeled.NewStopWatch("dynamic_workflow_cache_hit", "A dynamic workflow was loaded from store.", time.Microsecond, scope), CacheError: labeled.NewCounter("cache_err", "A dynamic workflow failed to store or load from data store.", scope), + CacheMiss: labeled.NewCounter("cache_miss", "A dynamic workflow did not already exist in the data store.", scope), } } @@ -79,7 +81,7 @@ func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevSt } if ok { // Mark the node that parent node has completed and a dynamic node executing its child nodes. Next time check node status is called, it'll go - // directly to progress the dynamically generated workflow. + // directly to record, and then progress the dynamically generated workflow. logger.Infof(ctx, "future file detected, assuming dynamic node") // There is a futures file, so we need to continue running the node with the modified state return trns.WithInfo(handler.PhaseInfoRunning(trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalizing}, nil @@ -90,6 +92,35 @@ func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevSt return trns, prevState, nil } +func (d dynamicNodeTaskNodeHandler) produceDynamicWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext) ( + handler.Transition, handler.DynamicNodeState, error) { + // The first time this is called we go ahead and evaluate the dynamic node to build the workflow. We then cache + // this workflow definition and send it to be persisted by flyteadmin so that users can observe the structure. + dCtx, err := d.buildContextualDynamicWorkflow(ctx, nCtx) + if err != nil { + if stdErrors.IsCausedBy(err, utils.ErrorCodeUser) { + return handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoFailure(core.ExecutionError_USER, "DynamicWorkflowBuildFailed", err.Error(), nil), + ), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil + } + return handler.Transition{}, handler.DynamicNodeState{}, err + } + taskNodeInfoMetadata := &event.TaskNodeMetadata{} + if dCtx.subWorkflowClosure != nil && dCtx.subWorkflowClosure.Primary != nil && dCtx.subWorkflowClosure.Primary.Template != nil { + taskNodeInfoMetadata.DynamicWorkflow = &event.DynamicWorkflowNodeMetadata{ + Id: dCtx.subWorkflowClosure.Primary.Template.Id, + CompiledWorkflow: dCtx.subWorkflowClosure, + } + } + + nextState := handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseExecuting} + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(&handler.ExecutionInfo{ + TaskNodeInfo: &handler.TaskNodeInfo{ + TaskNodeMetadata: taskNodeInfoMetadata, + }, + })), nextState, nil +} + func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) { dCtx, err := d.buildContextualDynamicWorkflow(ctx, nCtx) if err != nil { @@ -98,7 +129,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n handler.PhaseInfoFailure(core.ExecutionError_USER, "DynamicWorkflowBuildFailed", err.Error(), nil), ), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil } - // Mostly a system error or unknnwn + // Mostly a system error or unknown return handler.Transition{}, handler.DynamicNodeState{}, err } @@ -130,7 +161,8 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n return trns.WithInfo(handler.PhaseInfoFailureErr(ee.ExecutionError, trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: ee.ExecutionError.String()}, nil } - trns.WithInfo(trns.Info().WithInfo(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{TaskNodeMetadata: &event.TaskNodeMetadata{CacheStatus: status.GetCacheStatus(), CatalogKey: status.GetMetadata()}}})) + taskNodeInfoMetadata := &event.TaskNodeMetadata{CacheStatus: status.GetCacheStatus(), CatalogKey: status.GetMetadata()} + trns.WithInfo(trns.Info().WithInfo(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{TaskNodeMetadata: taskNodeInfoMetadata}})) } return trns, newState, nil @@ -139,6 +171,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n // The State machine for a dynamic node is as follows // DynamicNodePhaseNone: The parent node is being handled // DynamicNodePhaseParentFinalizing: The parent node has completes successfully and sub-nodes exist (futures file found). Parent node is being finalized. +// DynamicNodePhaseParentFinalized: The parent has node completed successfully and the generated dynamic sub workflow has been serialized and sent as an event. // DynamicNodePhaseExecuting: The parent node has completed and finalized successfully, the sub-nodes are being handled // DynamicNodePhaseFailing: one or more of sub-nodes have failed and the failure is being handled func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { @@ -171,8 +204,14 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod if err := d.finalizeParentNode(ctx, nCtx); err != nil { return handler.UnknownTransition, err } - newState = handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseExecuting} + newState = handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseParentFinalized} trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(trns.Info().GetInfo())) + case v1alpha1.DynamicNodePhaseParentFinalized: + trns, newState, err = d.produceDynamicWorkflow(ctx, nCtx) + if err != nil { + logger.Errorf(ctx, "handling producing dynamic workflow definition failed with error: %s", err.Error()) + return trns, err + } default: trns, newState, err = d.handleParentNode(ctx, ds, nCtx) if err != nil { diff --git a/pkg/controller/nodes/dynamic/handler_test.go b/pkg/controller/nodes/dynamic/handler_test.go index 5559f9db6c..dca24c239c 100644 --- a/pkg/controller/nodes/dynamic/handler_test.go +++ b/pkg/controller/nodes/dynamic/handler_test.go @@ -518,15 +518,15 @@ func Test_dynamicNodeHandler_Handle_SubTaskV1(t *testing.T) { want want }{ {"error", args{isErr: true, dj: createDynamicJobSpec()}, want{isErr: true}}, - {"success", args{s: executors.NodeStatusSuccess, dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"success", args{s: executors.NodeStatusSuccess, dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, {"complete", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), generateOutputs: true}, want{p: handler.EPhaseSuccess, phase: v1alpha1.DynamicNodePhaseExecuting}}, {"complete-no-outputs", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), generateOutputs: false}, want{p: handler.EPhaseRetryableFailure, phase: v1alpha1.DynamicNodePhaseFailing}}, {"complete-valid-error-retryable", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{IsRecoverable: true}, generateOutputs: true}, want{p: handler.EPhaseRetryableFailure, phase: v1alpha1.DynamicNodePhaseFailing}}, {"complete-valid-error", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{}, generateOutputs: true}, want{p: handler.EPhaseFailed, phase: v1alpha1.DynamicNodePhaseFailing}}, - {"failed", args{s: executors.NodeStatusFailed(&core.ExecutionError{}), dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseFailing}}, - {"running", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, - {"running-valid-err", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{}}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, - {"queued", args{s: executors.NodeStatusQueued, dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"failed", args{s: executors.NodeStatusFailed(&core.ExecutionError{}), dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseFailing}}, + {"running", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"running-valid-err", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{}}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"queued", args{s: executors.NodeStatusQueued, dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -706,15 +706,15 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { want want }{ {"error", args{isErr: true, dj: createDynamicJobSpec()}, want{isErr: true}}, - {"success", args{s: executors.NodeStatusSuccess, dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"success", args{s: executors.NodeStatusSuccess, dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, {"complete", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), generateOutputs: true}, want{p: handler.EPhaseSuccess, phase: v1alpha1.DynamicNodePhaseExecuting}}, {"complete-no-outputs", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), generateOutputs: false}, want{p: handler.EPhaseRetryableFailure, phase: v1alpha1.DynamicNodePhaseFailing}}, {"complete-valid-error-retryable", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{IsRecoverable: true}, generateOutputs: true}, want{p: handler.EPhaseRetryableFailure, phase: v1alpha1.DynamicNodePhaseFailing}}, {"complete-valid-error", args{s: executors.NodeStatusComplete, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{}, generateOutputs: true}, want{p: handler.EPhaseFailed, phase: v1alpha1.DynamicNodePhaseFailing}}, - {"failed", args{s: executors.NodeStatusFailed(&core.ExecutionError{}), dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseFailing}}, - {"running", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, - {"running-valid-err", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{}}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, - {"queued", args{s: executors.NodeStatusQueued, dj: createDynamicJobSpec()}, want{p: handler.EPhaseRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"failed", args{s: executors.NodeStatusFailed(&core.ExecutionError{}), dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseFailing}}, + {"running", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"running-valid-err", args{s: executors.NodeStatusRunning, dj: createDynamicJobSpec(), validErr: &io.ExecutionError{}}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, + {"queued", args{s: executors.NodeStatusQueued, dj: createDynamicJobSpec()}, want{p: handler.EPhaseDynamicRunning, phase: v1alpha1.DynamicNodePhaseExecuting}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 07d62fa92d..e751f62d98 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -376,7 +376,8 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node // execErr in phase-info 'p' is only available if node has failed to execute, and the current phase at that time // will be v1alpha1.NodePhaseRunning execErr := p.GetErr() - if execErr != nil && (currentPhase == v1alpha1.NodePhaseRunning || currentPhase == v1alpha1.NodePhaseQueued) { + if execErr != nil && (currentPhase == v1alpha1.NodePhaseRunning || currentPhase == v1alpha1.NodePhaseQueued || + currentPhase == v1alpha1.NodePhaseDynamicRunning) { endTime := time.Now() startTime := endTime if lastAttemptStartTime != nil { @@ -650,7 +651,8 @@ func canHandleNode(phase v1alpha1.NodePhase) bool { phase == v1alpha1.NodePhaseFailing || phase == v1alpha1.NodePhaseTimingOut || phase == v1alpha1.NodePhaseRetryableFailure || - phase == v1alpha1.NodePhaseSucceeding + phase == v1alpha1.NodePhaseSucceeding || + phase == v1alpha1.NodePhaseDynamicRunning } func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, diff --git a/pkg/controller/nodes/handler/ephase_enumer.go b/pkg/controller/nodes/handler/ephase_enumer.go index ed2318c031..a08c9e445f 100644 --- a/pkg/controller/nodes/handler/ephase_enumer.go +++ b/pkg/controller/nodes/handler/ephase_enumer.go @@ -7,9 +7,9 @@ import ( "fmt" ) -const _EPhaseName = "UndefinedNotReadyQueuedRunningSkipFailedRetryableFailureSuccessTimedoutFailing" +const _EPhaseName = "UndefinedNotReadyQueuedRunningSkipFailedRetryableFailureSuccessTimedoutFailingDynamicRunning" -var _EPhaseIndex = [...]uint8{0, 9, 17, 23, 30, 34, 40, 56, 63, 71, 78} +var _EPhaseIndex = [...]uint8{0, 9, 17, 23, 30, 34, 40, 56, 63, 71, 78, 92} func (i EPhase) String() string { if i >= EPhase(len(_EPhaseIndex)-1) { @@ -18,7 +18,7 @@ func (i EPhase) String() string { return _EPhaseName[_EPhaseIndex[i]:_EPhaseIndex[i+1]] } -var _EPhaseValues = []EPhase{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} +var _EPhaseValues = []EPhase{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} var _EPhaseNameToValueMap = map[string]EPhase{ _EPhaseName[0:9]: 0, @@ -31,6 +31,7 @@ var _EPhaseNameToValueMap = map[string]EPhase{ _EPhaseName[56:63]: 7, _EPhaseName[63:71]: 8, _EPhaseName[71:78]: 9, + _EPhaseName[78:92]: 10, } // EPhaseString retrieves an enum value from the enum constants string name. diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index 44d95fd4c5..edff42201a 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -23,6 +23,7 @@ const ( EPhaseSuccess EPhaseTimedout EPhaseFailing + EPhaseDynamicRunning ) func (p EPhase) IsTerminal() bool { @@ -121,6 +122,10 @@ func PhaseInfoRunning(info *ExecutionInfo) PhaseInfo { return phaseInfo(EPhaseRunning, nil, info, "running") } +func PhaseInfoDynamicRunning(info *ExecutionInfo) PhaseInfo { + return phaseInfo(EPhaseDynamicRunning, nil, info, "dynamic workflow running") +} + func PhaseInfoSuccess(info *ExecutionInfo) PhaseInfo { return phaseInfo(EPhaseSuccess, nil, info, "successfully completed") } diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go index e0f1962cbd..e9364fb921 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go @@ -22,9 +22,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/flyteorg/flyteidl/clients/go/datacatalog/mocks" "github.com/golang/protobuf/ptypes" - - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog/datacatalog/mocks" ) func init() { diff --git a/pkg/controller/nodes/task/catalog/datacatalog/mocks/DataCatalogClient.go b/pkg/controller/nodes/task/catalog/datacatalog/mocks/DataCatalogClient.go deleted file mode 100644 index afed201332..0000000000 --- a/pkg/controller/nodes/task/catalog/datacatalog/mocks/DataCatalogClient.go +++ /dev/null @@ -1,227 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mocks - -import ( - context "context" - - datacatalog "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" - grpc "google.golang.org/grpc" - - mock "github.com/stretchr/testify/mock" -) - -// DataCatalogClient is an autogenerated mock type for the DataCatalogClient type -type DataCatalogClient struct { - mock.Mock -} - -// AddTag provides a mock function with given fields: ctx, in, opts -func (_m *DataCatalogClient) AddTag(ctx context.Context, in *datacatalog.AddTagRequest, opts ...grpc.CallOption) (*datacatalog.AddTagResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *datacatalog.AddTagResponse - if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.AddTagRequest, ...grpc.CallOption) *datacatalog.AddTagResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*datacatalog.AddTagResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.AddTagRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// CreateArtifact provides a mock function with given fields: ctx, in, opts -func (_m *DataCatalogClient) CreateArtifact(ctx context.Context, in *datacatalog.CreateArtifactRequest, opts ...grpc.CallOption) (*datacatalog.CreateArtifactResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *datacatalog.CreateArtifactResponse - if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.CreateArtifactRequest, ...grpc.CallOption) *datacatalog.CreateArtifactResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*datacatalog.CreateArtifactResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.CreateArtifactRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// CreateDataset provides a mock function with given fields: ctx, in, opts -func (_m *DataCatalogClient) CreateDataset(ctx context.Context, in *datacatalog.CreateDatasetRequest, opts ...grpc.CallOption) (*datacatalog.CreateDatasetResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *datacatalog.CreateDatasetResponse - if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.CreateDatasetRequest, ...grpc.CallOption) *datacatalog.CreateDatasetResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*datacatalog.CreateDatasetResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.CreateDatasetRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// GetArtifact provides a mock function with given fields: ctx, in, opts -func (_m *DataCatalogClient) GetArtifact(ctx context.Context, in *datacatalog.GetArtifactRequest, opts ...grpc.CallOption) (*datacatalog.GetArtifactResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *datacatalog.GetArtifactResponse - if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.GetArtifactRequest, ...grpc.CallOption) *datacatalog.GetArtifactResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*datacatalog.GetArtifactResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.GetArtifactRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// GetDataset provides a mock function with given fields: ctx, in, opts -func (_m *DataCatalogClient) GetDataset(ctx context.Context, in *datacatalog.GetDatasetRequest, opts ...grpc.CallOption) (*datacatalog.GetDatasetResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *datacatalog.GetDatasetResponse - if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.GetDatasetRequest, ...grpc.CallOption) *datacatalog.GetDatasetResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*datacatalog.GetDatasetResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.GetDatasetRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ListArtifacts provides a mock function with given fields: ctx, in, opts -func (_m *DataCatalogClient) ListArtifacts(ctx context.Context, in *datacatalog.ListArtifactsRequest, opts ...grpc.CallOption) (*datacatalog.ListArtifactsResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *datacatalog.ListArtifactsResponse - if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.ListArtifactsRequest, ...grpc.CallOption) *datacatalog.ListArtifactsResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*datacatalog.ListArtifactsResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.ListArtifactsRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ListDatasets provides a mock function with given fields: ctx, in, opts -func (_m *DataCatalogClient) ListDatasets(ctx context.Context, in *datacatalog.ListDatasetsRequest, opts ...grpc.CallOption) (*datacatalog.ListDatasetsResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *datacatalog.ListDatasetsResponse - if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.ListDatasetsRequest, ...grpc.CallOption) *datacatalog.ListDatasetsResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*datacatalog.ListDatasetsResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.ListDatasetsRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} diff --git a/pkg/controller/nodes/task/future_file_reader.go b/pkg/controller/nodes/task/future_file_reader.go index 25b5ab741a..b6ce7588f0 100644 --- a/pkg/controller/nodes/task/future_file_reader.go +++ b/pkg/controller/nodes/task/future_file_reader.go @@ -16,12 +16,14 @@ import ( // Once closure migration is done, this file should be deleted. const implicitFutureFileName = "futures.pb" const implicitCompileWorkflowsName = "futures_compiled.pb" +const implicitCompiledWorkflowClosureName = "dynamic_compiled.pb" type FutureFileReader struct { RemoteFileWorkflowStore - loc storage.DataReference - cacheLoc storage.DataReference - store *storage.DataStore + loc storage.DataReference + flyteWfCRDCacheLoc storage.DataReference + flyteWfClosureCacheLoc storage.DataReference + store *storage.DataStore } func (f FutureFileReader) Exists(ctx context.Context) (bool, error) { @@ -45,15 +47,39 @@ func (f FutureFileReader) Read(ctx context.Context) (*core.DynamicJobSpec, error } func (f FutureFileReader) CacheExists(ctx context.Context) (bool, error) { - return f.RemoteFileWorkflowStore.Exists(ctx, f.cacheLoc) + exists, err := f.RemoteFileWorkflowStore.Exists(ctx, f.flyteWfCRDCacheLoc) + if err != nil || !exists { + return exists, err + } + return f.RemoteFileWorkflowStore.Exists(ctx, f.flyteWfClosureCacheLoc) } -func (f FutureFileReader) Cache(ctx context.Context, wf *v1alpha1.FlyteWorkflow) error { - return f.RemoteFileWorkflowStore.Put(ctx, wf, f.cacheLoc) +func (f FutureFileReader) Cache(ctx context.Context, wf *v1alpha1.FlyteWorkflow, workflowClosure *core.CompiledWorkflowClosure) error { + err := f.RemoteFileWorkflowStore.PutFlyteWorkflowCRD(ctx, wf, f.flyteWfCRDCacheLoc) + if err != nil { + return err + } + return f.RemoteFileWorkflowStore.PutCompiledFlyteWorkflow(ctx, workflowClosure, f.flyteWfClosureCacheLoc) } -func (f FutureFileReader) RetrieveCache(ctx context.Context) (*v1alpha1.FlyteWorkflow, error) { - return f.RemoteFileWorkflowStore.Get(ctx, f.cacheLoc) +type CacheContents struct { + WorkflowCRD *v1alpha1.FlyteWorkflow + CompiledWorkflow *core.CompiledWorkflowClosure +} + +func (f FutureFileReader) RetrieveCache(ctx context.Context) (CacheContents, error) { + workflowCRD, err := f.RemoteFileWorkflowStore.GetWorkflowCRD(ctx, f.flyteWfCRDCacheLoc) + if err != nil { + return CacheContents{}, err + } + compiledWorkflow, err := f.RemoteFileWorkflowStore.GetCompiledWorkflow(ctx, f.flyteWfClosureCacheLoc) + if err != nil { + return CacheContents{}, err + } + return CacheContents{ + WorkflowCRD: workflowCRD, + CompiledWorkflow: compiledWorkflow, + }, nil } func NewRemoteFutureFileReader(ctx context.Context, dataDir storage.DataReference, store *storage.DataStore) (FutureFileReader, error) { @@ -63,14 +89,21 @@ func NewRemoteFutureFileReader(ctx context.Context, dataDir storage.DataReferenc return FutureFileReader{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to construct data path") } - cacheLoc, err := store.ConstructReference(ctx, dataDir, implicitCompileWorkflowsName) + flyteWfCRDCacheLoc, err := store.ConstructReference(ctx, dataDir, implicitCompileWorkflowsName) if err != nil { logger.Warnf(ctx, "Failed to construct data path for compile workflows file, error: %s", err) - return FutureFileReader{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to construct reference for cache location") + return FutureFileReader{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to construct reference for workflow CRD cache location") } + flyteWfClosureCacheLoc, err := store.ConstructReference(ctx, dataDir, implicitCompiledWorkflowClosureName) + if err != nil { + logger.Warnf(ctx, "Failed to construct data path for compile workflows file, error: %s", err) + return FutureFileReader{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to construct reference for compiled workflow closure cache location") + } + return FutureFileReader{ loc: loc, - cacheLoc: cacheLoc, + flyteWfCRDCacheLoc: flyteWfCRDCacheLoc, + flyteWfClosureCacheLoc: flyteWfClosureCacheLoc, store: store, RemoteFileWorkflowStore: NewRemoteWorkflowStore(store), }, nil diff --git a/pkg/controller/nodes/task/remote_workflow_store.go b/pkg/controller/nodes/task/remote_workflow_store.go index 7a1594051b..49abe98b6e 100644 --- a/pkg/controller/nodes/task/remote_workflow_store.go +++ b/pkg/controller/nodes/task/remote_workflow_store.go @@ -5,6 +5,8 @@ import ( "context" "encoding/json" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/storage" "github.com/pkg/errors" @@ -26,7 +28,7 @@ func (r RemoteFileWorkflowStore) Exists(ctx context.Context, path storage.DataRe return metadata.Exists(), nil } -func (r RemoteFileWorkflowStore) Put(ctx context.Context, wf *v1alpha1.FlyteWorkflow, target storage.DataReference) error { +func (r RemoteFileWorkflowStore) PutFlyteWorkflowCRD(ctx context.Context, wf *v1alpha1.FlyteWorkflow, target storage.DataReference) error { raw, err := json.Marshal(wf) if err != nil { return err @@ -35,7 +37,11 @@ func (r RemoteFileWorkflowStore) Put(ctx context.Context, wf *v1alpha1.FlyteWork return r.store.WriteRaw(ctx, target, int64(len(raw)), storage.Options{}, bytes.NewReader(raw)) } -func (r RemoteFileWorkflowStore) Get(ctx context.Context, source storage.DataReference) (*v1alpha1.FlyteWorkflow, error) { +func (r RemoteFileWorkflowStore) PutCompiledFlyteWorkflow(ctx context.Context, workflow *core.CompiledWorkflowClosure, target storage.DataReference) error { + return r.store.WriteProtobuf(ctx, target, storage.Options{}, workflow) +} + +func (r RemoteFileWorkflowStore) getRawBytes(ctx context.Context, source storage.DataReference) ([]byte, error) { rawReader, err := r.store.ReadRaw(ctx, source) if err != nil { @@ -52,9 +58,23 @@ func (r RemoteFileWorkflowStore) Get(ctx context.Context, source storage.DataRef if err != nil { return nil, err } + return buf.Bytes(), nil +} + +func (r RemoteFileWorkflowStore) GetWorkflowCRD(ctx context.Context, source storage.DataReference) (*v1alpha1.FlyteWorkflow, error) { + wfBytes, err := r.getRawBytes(ctx, source) + if err != nil { + return nil, err + } wf := &v1alpha1.FlyteWorkflow{} - return wf, json.Unmarshal(buf.Bytes(), wf) + return wf, json.Unmarshal(wfBytes, wf) +} + +func (r RemoteFileWorkflowStore) GetCompiledWorkflow(ctx context.Context, source storage.DataReference) (*core.CompiledWorkflowClosure, error) { + var closure core.CompiledWorkflowClosure + err := r.store.ReadProtobuf(ctx, source, &closure) + return &closure, err } func NewRemoteWorkflowStore(store *storage.DataStore) RemoteFileWorkflowStore { diff --git a/pkg/controller/nodes/task/remote_workflow_store_test.go b/pkg/controller/nodes/task/remote_workflow_store_test.go index e20929e020..a5874348bb 100644 --- a/pkg/controller/nodes/task/remote_workflow_store_test.go +++ b/pkg/controller/nodes/task/remote_workflow_store_test.go @@ -4,6 +4,10 @@ import ( "context" "testing" + "github.com/golang/protobuf/proto" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/storage" "github.com/stretchr/testify/assert" @@ -25,23 +29,48 @@ func createInmemoryStore(t testing.TB) *storage.DataStore { func Test_cacheFlyteWorkflow(t *testing.T) { store := createInmemoryStore(t) - expected := &v1alpha1.FlyteWorkflow{ - TypeMeta: v1.TypeMeta{}, - ObjectMeta: v1.ObjectMeta{}, - WorkflowSpec: &v1alpha1.WorkflowSpec{ - ID: "abc", - Connections: v1alpha1.Connections{ - DownstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, - UpstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, + t.Run("cache CRD", func(t *testing.T) { + expected := &v1alpha1.FlyteWorkflow{ + TypeMeta: v1.TypeMeta{}, + ObjectMeta: v1.ObjectMeta{}, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "abc", + Connections: v1alpha1.Connections{ + DownstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, + UpstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, + }, }, - }, - } + } - ctx := context.TODO() - location := storage.DataReference("somekey/file.json") - r := RemoteFileWorkflowStore{store: store} - assert.NoError(t, r.Put(ctx, expected, location)) - actual, err := r.Get(ctx, location) - assert.NoError(t, err) - assert.Equal(t, expected, actual) + ctx := context.TODO() + location := storage.DataReference("somekey/file.json") + r := RemoteFileWorkflowStore{store: store} + assert.NoError(t, r.PutFlyteWorkflowCRD(ctx, expected, location)) + actual, err := r.GetWorkflowCRD(ctx, location) + assert.NoError(t, err) + assert.Equal(t, expected, actual) + }) + t.Run("cache compiled workflow", func(t *testing.T) { + expected := &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_WORKFLOW, + Project: "proj", + Domain: "domain", + Name: "name", + Version: "version", + }, + }, + }, + } + + ctx := context.TODO() + location := storage.DataReference("somekey/dynamic_compiled.pb") + r := RemoteFileWorkflowStore{store: store} + assert.NoError(t, r.PutCompiledFlyteWorkflow(ctx, expected, location)) + actual, err := r.GetCompiledWorkflow(ctx, location) + assert.NoError(t, err) + assert.True(t, proto.Equal(expected, actual)) + }) } diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index 5ffa85a2ce..129c3f4c48 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -56,6 +56,8 @@ func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase { return core.NodeExecution_QUEUED case handler.EPhaseRunning, handler.EPhaseRetryableFailure: return core.NodeExecution_RUNNING + case handler.EPhaseDynamicRunning: + return core.NodeExecution_DYNAMIC_RUNNING case handler.EPhaseSkip: return core.NodeExecution_SKIPPED case handler.EPhaseSuccess: @@ -85,9 +87,15 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, return nil, err } + phase := ToNodeExecEventPhase(info.GetPhase()) + if eventVersion < v1alpha1.EventVersion2 && phase == core.NodeExecution_DYNAMIC_RUNNING { + // For older workflow event versions we lump dynamic running with running. + phase = core.NodeExecution_RUNNING + } + nev := &event.NodeExecutionEvent{ Id: nodeExecID, - Phase: ToNodeExecEventPhase(info.GetPhase()), + Phase: phase, InputUri: reader.GetInputPath().String(), OccurredAt: occurredTime, } @@ -146,6 +154,8 @@ func ToNodePhase(p handler.EPhase) (v1alpha1.NodePhase, error) { return v1alpha1.NodePhaseQueued, nil case handler.EPhaseRunning: return v1alpha1.NodePhaseRunning, nil + case handler.EPhaseDynamicRunning: + return v1alpha1.NodePhaseDynamicRunning, nil case handler.EPhaseRetryableFailure: return v1alpha1.NodePhaseRetryableFailure, nil case handler.EPhaseSkip: