Skip to content

Commit

Permalink
Cache dynamic workflow (flyteorg#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Apr 28, 2021
1 parent 2f891fa commit c5dde45
Show file tree
Hide file tree
Showing 19 changed files with 521 additions and 326 deletions.
2 changes: 1 addition & 1 deletion cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ColorizeNodePhase(p v1alpha1.NodePhase) string {
switch p {
case v1alpha1.NodePhaseNotYetStarted:
return p.String()
case v1alpha1.NodePhaseRunning:
case v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseDynamicRunning:
return color.YellowString("%s", p.String())
case v1alpha1.NodePhaseSucceeded:
return color.HiGreenString("%s", p.String())
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.31
github.com/flyteorg/flyteidl v0.18.39
github.com/flyteorg/flyteplugins v0.5.42
github.com/flyteorg/flytestdlib v0.3.16
github.com/flyteorg/flytestdlib v0.3.17
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-test/deep v1.0.7
Expand Down
14 changes: 10 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA=
Expand Down Expand Up @@ -231,14 +232,18 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.31 h1:aEu9HRT2GsiVKJsHYBEUA+zkPgCWFSokE0/kixYQiFY=
github.com/flyteorg/flyteidl v0.18.31/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.33 h1:CJFhjBoAA9Dmv8PdE4geYrupwb8mxzmHDHbCjneBiLw=
github.com/flyteorg/flyteidl v0.18.33/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.36-0.20210417001501-910f8f60f11b h1:HqCfdGY2keiyyIqOfF+QzdUt7pgHJLxw4GxTBAPvQe4=
github.com/flyteorg/flyteidl v0.18.36-0.20210417001501-910f8f60f11b/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.39 h1:AxsG5fyx7TMWRmI8Bb96nVsdC3njjrqVSlO/UmOIL7k=
github.com/flyteorg/flyteidl v0.18.39/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.42 h1:G4DRR2r8LlmkV+orXloDi1ly+M5WuvAaNlWFgGGyy3A=
github.com/flyteorg/flyteplugins v0.5.42/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.16 h1:wO/tk0KfLIpLS89GbJWG4nNADWnOse4nQbTDN/mEvzM=
github.com/flyteorg/flytestdlib v0.3.16/go.mod h1:VlbQuHTE+z2N5qusfwi+6WEkeJoqr8Q0E4NtBAsdwkU=
github.com/flyteorg/flytestdlib v0.3.17 h1:7OexDLAjTBzJNGMmKKFmUTkss0I9IFo1LdTMpvH4qqA=
github.com/flyteorg/flytestdlib v0.3.17/go.mod h1:VlbQuHTE+z2N5qusfwi+6WEkeJoqr8Q0E4NtBAsdwkU=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
Expand Down Expand Up @@ -1231,6 +1236,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
NodePhaseRetryableFailure
NodePhaseTimingOut
NodePhaseTimedOut
NodePhaseDynamicRunning
)

func (p NodePhase) String() string {
Expand All @@ -89,6 +90,8 @@ func (p NodePhase) String() string {
return "Skipped"
case NodePhaseRetryableFailure:
return "RetryableFailure"
case NodePhaseDynamicRunning:
return "DynamicRunning"
}

return "Unknown"
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ const (
DynamicNodePhaseFailing
// This Phase implies that the Parent node is done but it needs to be finalized before progressing to the sub-nodes (or dynamically yielded nodes)
DynamicNodePhaseParentFinalizing
// This Phase implies that the Parent node has finalized and the sub-node (or dynamically yielded nodes) can now be processed.
DynamicNodePhaseParentFinalized
)

type DynamicNodeStatus struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type EventVersion int
const (
EventVersion0 EventVersion = iota
EventVersion1
EventVersion2
)

type NodeDefaults struct {
Expand Down
82 changes: 49 additions & 33 deletions pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ import (
"fmt"
"strconv"

"k8s.io/apimachinery/pkg/util/rand"

node_common "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/errors"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/compiler"
"github.com/flyteorg/flytepropeller/pkg/compiler/common"
Expand All @@ -22,15 +19,21 @@ import (
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task"
"github.com/flyteorg/flytepropeller/pkg/utils"
"github.com/flyteorg/flytestdlib/errors"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"
)

type dynamicWorkflowContext struct {
execContext executors.ExecutionContext
subWorkflow v1alpha1.ExecutableWorkflow
nodeLookup executors.NodeLookup
isDynamic bool
execContext executors.ExecutionContext
subWorkflow v1alpha1.ExecutableWorkflow
subWorkflowClosure *core.CompiledWorkflowClosure
nodeLookup executors.NodeLookup
isDynamic bool
}

const dynamicWfNameTemplate = "dynamic_%s"

func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Context, djSpec *core.DynamicJobSpec,
nCtx handler.NodeExecutionContext, parentNodeStatus v1alpha1.ExecutableNodeStatus) (*core.WorkflowTemplate, error) {

Expand Down Expand Up @@ -106,8 +109,8 @@ func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Con
Id: &core.Identifier{
Project: nCtx.NodeExecutionMetadata().GetNodeExecutionID().GetExecutionId().Project,
Domain: nCtx.NodeExecutionMetadata().GetNodeExecutionID().GetExecutionId().Domain,
Name: fmt.Sprintf(dynamicWfNameTemplate, nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId),
Version: rand.String(10),
Name: rand.String(10),
ResourceType: core.ResourceType_WORKFLOW,
},
Nodes: djSpec.Nodes,
Expand All @@ -132,31 +135,42 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
dynamicNodeStatus.SetOutputDir(nCtx.NodeStatus().GetOutputDir())
dynamicNodeStatus.SetParentTaskID(execID)

// cacheHitStopWatch := d.metrics.CacheHit.Start(ctx)
cacheHitStopWatch := d.metrics.CacheHit.Start(ctx)
// Check if we have compiled the workflow before:
// If there is a cached compiled Workflow, load and return it.
// if ok, err := f.CacheExists(ctx); err != nil {
// logger.Warnf(ctx, "Failed to call head on compiled futures file. Error: %v", err)
// return nil, false, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to do HEAD on compiled futures file.")
// } else if ok {
// // It exists, load and return it
// compiledWf, err := f.RetrieveCache(ctx)
// if err != nil {
// logger.Warnf(ctx, "Failed to load cached flyte workflow , this will cause the dynamic workflow to be recompiled. Error: %v", err)
// d.metrics.CacheError.Inc(ctx)
// } else {
// cacheHitStopWatch.Stop()
// return newContextualWorkflow(nCtx.Workflow(), compiledWf, dynamicNodeStatus, compiledWf.Tasks, compiledWf.SubWorkflows), true, nil
// }
// }
if ok, err := f.CacheExists(ctx); err != nil {
logger.Warnf(ctx, "Failed to call head on compiled workflow files. Error: %v", err)
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "Failed to do HEAD on compiled workflow files.")
} else if ok {
// It exists, load and return it
workflowCacheContents, err := f.RetrieveCache(ctx)
if err != nil {
logger.Warnf(ctx, "Failed to load cached flyte workflow, this will cause the dynamic workflow to be recompiled. Error: %v", err)
d.metrics.CacheError.Inc(ctx)
} else {
cacheHitStopWatch.Stop()
newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt())
if err != nil {
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID")
}
compiledWf := workflowCacheContents.WorkflowCRD
return dynamicWorkflowContext{
isDynamic: true,
subWorkflow: compiledWf,
subWorkflowClosure: workflowCacheContents.CompiledWorkflow,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), compiledWf, compiledWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus),
}, nil
}
}
d.metrics.CacheMiss.Inc(ctx)

// We know for sure that futures file was generated. Lets read it
djSpec, err := f.Read(ctx)
if err != nil {
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "unable to read futures file, maybe corrupted")
}

var closure *core.CompiledWorkflowClosure
wf, err := d.buildDynamicWorkflowTemplate(ctx, djSpec, nCtx, dynamicNodeStatus)
if err != nil {
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template")
Expand Down Expand Up @@ -184,6 +198,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
// The reason they might be missing is because if a user yields a task that is SdkTask.fetch'ed, it should not be included
// See https://github.com/flyteorg/flyte/issues/219 for more information.

var closure *core.CompiledWorkflowClosure
closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, launchPlanInterfaces)
if err != nil {
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow")
Expand All @@ -194,7 +209,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow")
}

if err := f.Cache(ctx, dynamicWf); err != nil {
if err := f.Cache(ctx, dynamicWf, closure); err != nil {
logger.Errorf(ctx, "Failed to cache Dynamic workflow [%s]", err.Error())
}

Expand All @@ -205,10 +220,11 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID")
}
return dynamicWorkflowContext{
isDynamic: true,
subWorkflow: dynamicWf,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus),
isDynamic: true,
subWorkflow: dynamicWf,
subWorkflowClosure: closure,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus),
}, nil
}

Expand All @@ -228,7 +244,7 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context,
}

// As we do not support Failure Node, we can just return failure in this case
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)),
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(nil)),
handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Dynamic workflow failed", Error: state.Err},
nil
}
Expand Down Expand Up @@ -278,7 +294,7 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context,
}
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), prevState, nil
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(nil)), prevState, nil
}

func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, launchPlanIDs []compiler.LaunchPlanRefIdentifier) (
Expand Down
Loading

0 comments on commit c5dde45

Please sign in to comment.