Skip to content

Commit

Permalink
Cache Dynamic WF Spec (#16)
Browse files Browse the repository at this point in the history
* Cache Dynamic WF Spec

* Cleanup

* Update Deps
  • Loading branch information
EngHabu authored Sep 20, 2019
1 parent daa7e99 commit 8ab3272
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 44 deletions.
44 changes: 22 additions & 22 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,7 @@ func GetOutputErrorFile(inputDir DataReference) DataReference {
func GetFutureFile() string {
return "futures.pb"
}

func GetCompiledFutureFile() string {
return "futures_compiled.pb"
}
110 changes: 88 additions & 22 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ type dynamicNodeHandler struct {
type metrics struct {
buildDynamicWorkflow labeled.StopWatch
retrieveDynamicJobSpec labeled.StopWatch
CacheHit labeled.StopWatch
CacheError labeled.Counter
}

func newMetrics(scope promutils.Scope) metrics {
return metrics{
buildDynamicWorkflow: labeled.NewStopWatch("build_dynamic_workflow", "Overhead for building a dynamic workflow in memory.", time.Microsecond, scope),
retrieveDynamicJobSpec: labeled.NewStopWatch("retrieve_dynamic_spec", "Overhead of downloading and unmarshaling dynamic job spec", time.Microsecond, scope),
CacheHit: labeled.NewStopWatch("dynamic_workflow_cache_hit", "A dynamic workflow was loaded from store.", time.Microsecond, scope),
CacheError: labeled.NewCounter("cache_err", "A dynamic workflow failed to store or load from data store.", scope),
}
}

Expand All @@ -56,11 +60,11 @@ func (e dynamicNodeHandler) ExtractOutput(ctx context.Context, w v1alpha1.Execut
return outputResolver.ExtractOutput(ctx, w, n, bindToVar)
}

func (e dynamicNodeHandler) getDynamicJobSpec(ctx context.Context, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) (*core.DynamicJobSpec, error) {
func (e dynamicNodeHandler) getDynamicJobSpec(ctx context.Context, node v1alpha1.ExecutableNode, dataDir storage.DataReference) (*core.DynamicJobSpec, error) {
t := e.metrics.retrieveDynamicJobSpec.Start(ctx)
defer t.Stop()

futuresFilePath, err := e.store.ConstructReference(ctx, nodeStatus.GetDataDir(), v1alpha1.GetFutureFile())
futuresFilePath, err := e.store.ConstructReference(ctx, dataDir, v1alpha1.GetFutureFile())
if err != nil {
logger.Warnf(ctx, "Failed to construct data path for futures file. Error: %v", err)
return nil, err
Expand Down Expand Up @@ -218,22 +222,7 @@ func (e dynamicNodeHandler) CheckNodeStatus(ctx context.Context, w v1alpha1.Exec
func (e dynamicNodeHandler) buildContextualDynamicWorkflow(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode,
previousNodeStatus v1alpha1.ExecutableNodeStatus) (dynamicWf v1alpha1.ExecutableWorkflow, status v1alpha1.ExecutableNodeStatus, isDynamic bool, err error) {

t := e.metrics.buildDynamicWorkflow.Start(ctx)
defer t.Stop()

var nStatus v1alpha1.ExecutableNodeStatus
// We will only get here if the Phase is success. The downside is that this is an overhead for all nodes that are
// not dynamic. But given that we will only check once, it should be ok.
// TODO: Check for node.is_dynamic once the IDL changes are in and SDK migration has happened.
djSpec, err := e.getDynamicJobSpec(ctx, node, previousNodeStatus)
if err != nil {
return nil, nil, false, err
}

if djSpec == nil {
return nil, status, false, nil
}

rootNodeStatus := w.GetNodeExecutionStatus(node.GetID())
if node.GetTaskID() != nil {
// TODO: This is a hack to set parent task execution id, we should move to node-node relationship.
Expand All @@ -253,29 +242,106 @@ func (e dynamicNodeHandler) buildContextualDynamicWorkflow(ctx context.Context,
nStatus = w.GetNodeExecutionStatus(node.GetID())
}

subwf, isDynamic, err := e.loadOrBuildDynamicWorkflow(ctx, w, node, previousNodeStatus.GetDataDir(), nStatus)
if err != nil {
return nil, nStatus, false, err
}

if !isDynamic {
return nil, nil, false, nil
}

return newContextualWorkflow(w, subwf, nStatus, subwf.Tasks, subwf.SubWorkflows), nStatus, isDynamic, nil
}

func (e dynamicNodeHandler) buildFlyteWorkflow(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode,
dataDir storage.DataReference, nStatus v1alpha1.ExecutableNodeStatus) (compiledWf *v1alpha1.FlyteWorkflow, isDynamic bool, err error) {
t := e.metrics.buildDynamicWorkflow.Start(ctx)
defer t.Stop()

// We will only get here if the Phase is success. The downside is that this is an overhead for all nodes that are
// not dynamic. But given that we will only check once, it should be ok.
// TODO: Check for node.is_dynamic once the IDL changes are in and SDK migration has happened.
djSpec, err := e.getDynamicJobSpec(ctx, node, dataDir)
if err != nil {
return nil, false, err
}

if djSpec == nil {
return nil, false, nil
}

var closure *core.CompiledWorkflowClosure
wf, err := e.buildDynamicWorkflowTemplate(ctx, djSpec, w, node, nStatus)
if err != nil {
return nil, nil, true, err
return nil, true, err
}

compiledTasks, err := compileTasks(ctx, djSpec.Tasks)
if err != nil {
return nil, nil, true, err
return nil, true, err
}

// TODO: This will currently fail if the WF references any launch plans
closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, []common2.InterfaceProvider{})
if err != nil {
return nil, nil, true, err
return nil, true, err
}

subwf, err := k8s.BuildFlyteWorkflow(closure, nil, nil, "")
if err != nil {
return nil, nil, true, err
return nil, false, err
}

return subwf, true, nil
}

func (e dynamicNodeHandler) loadOrBuildDynamicWorkflow(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode,
dataDir storage.DataReference, nodeStatus v1alpha1.ExecutableNodeStatus) (compiledWf *v1alpha1.FlyteWorkflow, isDynamic bool, err error) {

cacheHitStopWatch := e.metrics.CacheHit.Start(ctx)
// Check if we have compiled the workflow before:
compiledFuturesFilePath, err := e.store.ConstructReference(ctx, nodeStatus.GetDataDir(), v1alpha1.GetCompiledFutureFile())
if err != nil {
logger.Warnf(ctx, "Failed to construct data path for futures file. Error: %v", err)
return nil, false, errors.Wrapf(errors.CausedByError, node.GetID(), err, "Failed to construct data path for futures file.")
}

// If there is a cached compiled Workflow, load and return it.
if metadata, err := e.store.Head(ctx, compiledFuturesFilePath); err != nil {
logger.Warnf(ctx, "Failed to call head on compiled futures file. Error: %v", err)
return nil, false, errors.Wrapf(errors.CausedByError, node.GetID(), err, "Failed to do HEAD on compiled futures file.")
} else if metadata.Exists() {
// It exists, load and return it
compiledWf, err = loadCachedFlyteWorkflow(ctx, e.store, compiledFuturesFilePath)
if err != nil {
logger.Warnf(ctx, "Failed to load cached flyte workflow from [%v], this will cause the dynamic workflow to be recompiled. Error: %v",
compiledFuturesFilePath, err)
e.metrics.CacheError.Inc(ctx)
} else {
cacheHitStopWatch.Stop()
return compiledWf, true, nil
}
}

// If we have not build this spec before, build it now and cache it.
compiledWf, isDynamic, err = e.buildFlyteWorkflow(ctx, w, node, dataDir, nodeStatus)
if err != nil {
return compiledWf, isDynamic, err
}

if !isDynamic {
return compiledWf, isDynamic, err
}

// Cache the built WF. Errors are swallowed.
err = cacheFlyteWorkflow(ctx, e.store, compiledWf, compiledFuturesFilePath)
if err != nil {
logger.Warnf(ctx, "Failed to cache flyte workflow, this will cause a cache miss next time and cause the dynamic workflow to be recompiled. Error: %v", err)
e.metrics.CacheError.Inc(ctx)
}

return newContextualWorkflow(w, subwf, nStatus, subwf.Tasks, subwf.SubWorkflows), nStatus, true, nil
return compiledWf, true, nil
}

func (e dynamicNodeHandler) progressDynamicWorkflow(ctx context.Context, parentNodeStatus v1alpha1.ExecutableNodeStatus,
Expand Down
35 changes: 35 additions & 0 deletions pkg/controller/nodes/dynamic/utils.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package dynamic

import (
"bytes"
"context"
"encoding/json"

"github.com/lyft/flytestdlib/storage"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -103,3 +106,35 @@ func compileTasks(_ context.Context, tasks []*core.TaskTemplate) ([]*core.Compil

return compiledTasks, nil
}

func cacheFlyteWorkflow(ctx context.Context, store storage.RawStore, wf *v1alpha1.FlyteWorkflow, target storage.DataReference) error {
raw, err := json.Marshal(wf)
if err != nil {
return err
}

return store.WriteRaw(ctx, target, int64(len(raw)), storage.Options{}, bytes.NewReader(raw))
}

func loadCachedFlyteWorkflow(ctx context.Context, store storage.RawStore, source storage.DataReference) (
*v1alpha1.FlyteWorkflow, error) {

rawReader, err := store.ReadRaw(ctx, source)
if err != nil {
return nil, err
}

buf := bytes.NewBuffer(nil)
_, err = buf.ReadFrom(rawReader)
if err != nil {
return nil, err
}

err = rawReader.Close()
if err != nil {
return nil, err
}

wf := &v1alpha1.FlyteWorkflow{}
return wf, json.Unmarshal(buf.Bytes(), wf)
}
Loading

0 comments on commit 8ab3272

Please sign in to comment.