diff --git a/flytepropeller/Gopkg.lock b/flytepropeller/Gopkg.lock index 184534eed4..496ae1dc2b 100644 --- a/flytepropeller/Gopkg.lock +++ b/flytepropeller/Gopkg.lock @@ -61,7 +61,7 @@ version = "1.0.0" [[projects]] - digest = "1:3cb5a7438cd06b18f192433cdb375f557c3c2eb6d97b88f0eb27c42f65c6b4dd" + digest = "1:43785d5148b719a3f30dbda83a2e2f46d56ac6c5de83a14802e3fea7ffd0a5b9" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -105,8 +105,8 @@ "service/sts/stsiface", ] pruneopts = "" - revision = "b4225bdde03b756685c89b7db31fe2ffac6d9234" - version = "v1.24.0" + revision = "86666b1e15ce3072f0a5e22d7988d8d195c44611" + version = "v1.24.2" [[projects]] branch = "master" @@ -466,7 +466,7 @@ version = "v0.14.1" [[projects]] - digest = "1:ab33b506292441cab3455961ad86e9e5e81989b2f6089d4437d2b15a6d417dc2" + digest = "1:b8a5dd502461caca270a001089dd5dda700a409c79748cef84742e27f03ed83a" name = "github.com/lyft/flyteplugins" packages = [ "go/tasks", @@ -488,9 +488,9 @@ "go/tasks/v1/utils", ] pruneopts = "" - revision = "31b0fb21c6bf86f9e5e81297d9ad3de3feaea6e6" + revision = "5592b9fb09a8f67cd0086ef1c623aa1cb1a2030f" source = "https://github.com/lyft/flyteplugins" - version = "v0.1.7" + version = "v0.1.8" [[projects]] digest = "1:b8860863eb1eb7fe4f7d2b3c6b34b89aaedeaae88d761d2c9a4eccc46262abcc" @@ -738,12 +738,12 @@ version = "v1.1.0" [[projects]] - digest = "1:1bff633980ce46a718d753a3bc7e3eb6d7e4df34bc7e4a9869e71ccb4314dc40" + digest = "1:688428eeb1ca80d92599eb3254bdf91b51d7e232fead3a73844c1f201a281e51" name = "github.com/spf13/pflag" packages = ["."] pruneopts = "" - revision = "e8f29969b682c41a730f8f08b76033b120498464" - version = "v1.0.4" + revision = "2e9d26c8c37aae03e3f9d4e90b7116f5accb7cab" + version = "v1.0.5" [[projects]] digest = "1:c25a789c738f7cc8ec7f34026badd4e117853f329334a5aa45cf5d0727d7d442" @@ -837,7 +837,7 @@ [[projects]] branch = "master" - digest = "1:af83d44f1195692d76697f62af82329be4d55b100d33b9b3db8f5d0f44563fb9" + digest = "1:ce26d94b8841936fff59bb524f4b96ac434f411b780b3aa784da90ee96ae2367" name = "golang.org/x/net" packages = [ "context", @@ -850,7 +850,7 @@ "trace", ] pruneopts = "" - revision = "c8589233b77dde5edd2205ba8a4fb5c9c2472556" + revision = "a8b05e9114ab0cb08faec337c959aed24b68bf50" [[projects]] branch = "master" @@ -868,14 +868,14 @@ [[projects]] branch = "master" - digest = "1:d75336f9fd966011ede7a692794c112e81f0cb80d6e0082ad352e1986fc7b5ee" + digest = "1:c04d252619a11f0ba51313ad9ee728c0b7bb61c34c7ab65e841a05ac350e65a0" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "b4ddaad3f8a36719f2b8bc6486c14cc468ca2bb5" + revision = "0c1ff786ef13daa914a3351c5e6b0321aed5960e" [[projects]] digest = "1:740b51a55815493a8d0f2b1e0d0ae48fe48953bf7eaf3fcc4198823bf67768c0" @@ -912,7 +912,7 @@ [[projects]] branch = "master" - digest = "1:3eccfc7625ff5137b398e7b16323d272f2423cd04985de69c79c3e0793494e8f" + digest = "1:2420676cd09e5b3bb8ffab905e706818ffa5d91af9e70a202eb36d251fed8c64" name = "golang.org/x/tools" packages = [ "go/ast/astutil", @@ -929,11 +929,11 @@ "internal/semver", ] pruneopts = "" - revision = "1cc945182204dc50f5760d9859d043a3d4e27047" + revision = "db1d4edb46856964c77d7931f8076747b3015980" [[projects]] branch = "master" - digest = "1:1163141bda0af433265042eb4345e4433f1302991b6be9bfa623bd0316004e32" + digest = "1:8fe044fdd4c04c27426bc4d3a2914fa0a03d7199c546563d04ea0d9544af54ba" name = "google.golang.org/api" packages = [ "googleapi", @@ -947,7 +947,7 @@ "transport/http/internal/propagation", ] pruneopts = "" - revision = "7439972e83a764c2a87f6fa78c4a37e8ed2db615" + revision = "634b73c1f50be990f1ba97c3f325fb7f88b13775" [[projects]] digest = "1:0568e577f790e9bd0420521cff50580f9b38165a38f217ce68f55c4bbaa97066" @@ -1259,20 +1259,20 @@ revision = "ebc107f98eab922ef99d645781b87caca01f4f48" [[projects]] - digest = "1:3063061b6514ad2666c4fa292451685884cacf77c803e1b10b4a4fa23f7787fb" + digest = "1:7ce71844fcaaabcbe09a392902edb5790ddca3a7070ae8d20830dc6dbe2751af" name = "k8s.io/klog" packages = ["."] pruneopts = "" - revision = "3ca30a56d8a775276f9cdae009ba326fdc05af7f" - version = "v0.4.0" + revision = "2ca9ad30301bf30a8a6e0fa2110db6b8df699a91" + version = "v1.0.0" [[projects]] branch = "master" - digest = "1:71e59e355758d825c891c77bfe3ec2c0b2523b05076e96b2a2bfa804e6ac576a" + digest = "1:ad13d36fb31a3e590b143439610f1a35b4033437ebf565dbc14a72ed4bd61dfb" name = "k8s.io/kube-openapi" packages = ["pkg/util/proto"] pruneopts = "" - revision = "743ec37842bffe49dd4221d9026f30fb1d5adbc4" + revision = "0270cf2f1c1d995d34b36019a6f65d58e6e33ad4" [[projects]] digest = "1:77629c3c036454b4623e99e20f5591b9551dd81d92db616384af92435b52e9b6" diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index deea5db502..45ca152a4e 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -389,3 +389,7 @@ func GetOutputErrorFile(inputDir DataReference) DataReference { func GetFutureFile() string { return "futures.pb" } + +func GetCompiledFutureFile() string { + return "futures_compiled.pb" +} diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler.go b/flytepropeller/pkg/controller/nodes/dynamic/handler.go index fe8af1cb36..6769838de5 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler.go @@ -37,12 +37,16 @@ type dynamicNodeHandler struct { type metrics struct { buildDynamicWorkflow labeled.StopWatch retrieveDynamicJobSpec labeled.StopWatch + CacheHit labeled.StopWatch + CacheError labeled.Counter } func newMetrics(scope promutils.Scope) metrics { return metrics{ buildDynamicWorkflow: labeled.NewStopWatch("build_dynamic_workflow", "Overhead for building a dynamic workflow in memory.", time.Microsecond, scope), retrieveDynamicJobSpec: labeled.NewStopWatch("retrieve_dynamic_spec", "Overhead of downloading and unmarshaling dynamic job spec", time.Microsecond, scope), + CacheHit: labeled.NewStopWatch("dynamic_workflow_cache_hit", "A dynamic workflow was loaded from store.", time.Microsecond, scope), + CacheError: labeled.NewCounter("cache_err", "A dynamic workflow failed to store or load from data store.", scope), } } @@ -56,11 +60,11 @@ func (e dynamicNodeHandler) ExtractOutput(ctx context.Context, w v1alpha1.Execut return outputResolver.ExtractOutput(ctx, w, n, bindToVar) } -func (e dynamicNodeHandler) getDynamicJobSpec(ctx context.Context, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) (*core.DynamicJobSpec, error) { +func (e dynamicNodeHandler) getDynamicJobSpec(ctx context.Context, node v1alpha1.ExecutableNode, dataDir storage.DataReference) (*core.DynamicJobSpec, error) { t := e.metrics.retrieveDynamicJobSpec.Start(ctx) defer t.Stop() - futuresFilePath, err := e.store.ConstructReference(ctx, nodeStatus.GetDataDir(), v1alpha1.GetFutureFile()) + futuresFilePath, err := e.store.ConstructReference(ctx, dataDir, v1alpha1.GetFutureFile()) if err != nil { logger.Warnf(ctx, "Failed to construct data path for futures file. Error: %v", err) return nil, err @@ -218,22 +222,7 @@ func (e dynamicNodeHandler) CheckNodeStatus(ctx context.Context, w v1alpha1.Exec func (e dynamicNodeHandler) buildContextualDynamicWorkflow(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, previousNodeStatus v1alpha1.ExecutableNodeStatus) (dynamicWf v1alpha1.ExecutableWorkflow, status v1alpha1.ExecutableNodeStatus, isDynamic bool, err error) { - t := e.metrics.buildDynamicWorkflow.Start(ctx) - defer t.Stop() - var nStatus v1alpha1.ExecutableNodeStatus - // We will only get here if the Phase is success. The downside is that this is an overhead for all nodes that are - // not dynamic. But given that we will only check once, it should be ok. - // TODO: Check for node.is_dynamic once the IDL changes are in and SDK migration has happened. - djSpec, err := e.getDynamicJobSpec(ctx, node, previousNodeStatus) - if err != nil { - return nil, nil, false, err - } - - if djSpec == nil { - return nil, status, false, nil - } - rootNodeStatus := w.GetNodeExecutionStatus(node.GetID()) if node.GetTaskID() != nil { // TODO: This is a hack to set parent task execution id, we should move to node-node relationship. @@ -253,29 +242,106 @@ func (e dynamicNodeHandler) buildContextualDynamicWorkflow(ctx context.Context, nStatus = w.GetNodeExecutionStatus(node.GetID()) } + subwf, isDynamic, err := e.loadOrBuildDynamicWorkflow(ctx, w, node, previousNodeStatus.GetDataDir(), nStatus) + if err != nil { + return nil, nStatus, false, err + } + + if !isDynamic { + return nil, nil, false, nil + } + + return newContextualWorkflow(w, subwf, nStatus, subwf.Tasks, subwf.SubWorkflows), nStatus, isDynamic, nil +} + +func (e dynamicNodeHandler) buildFlyteWorkflow(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, + dataDir storage.DataReference, nStatus v1alpha1.ExecutableNodeStatus) (compiledWf *v1alpha1.FlyteWorkflow, isDynamic bool, err error) { + t := e.metrics.buildDynamicWorkflow.Start(ctx) + defer t.Stop() + + // We will only get here if the Phase is success. The downside is that this is an overhead for all nodes that are + // not dynamic. But given that we will only check once, it should be ok. + // TODO: Check for node.is_dynamic once the IDL changes are in and SDK migration has happened. + djSpec, err := e.getDynamicJobSpec(ctx, node, dataDir) + if err != nil { + return nil, false, err + } + + if djSpec == nil { + return nil, false, nil + } + var closure *core.CompiledWorkflowClosure wf, err := e.buildDynamicWorkflowTemplate(ctx, djSpec, w, node, nStatus) if err != nil { - return nil, nil, true, err + return nil, true, err } compiledTasks, err := compileTasks(ctx, djSpec.Tasks) if err != nil { - return nil, nil, true, err + return nil, true, err } // TODO: This will currently fail if the WF references any launch plans closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, []common2.InterfaceProvider{}) if err != nil { - return nil, nil, true, err + return nil, true, err } subwf, err := k8s.BuildFlyteWorkflow(closure, nil, nil, "") if err != nil { - return nil, nil, true, err + return nil, false, err + } + + return subwf, true, nil +} + +func (e dynamicNodeHandler) loadOrBuildDynamicWorkflow(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, + dataDir storage.DataReference, nodeStatus v1alpha1.ExecutableNodeStatus) (compiledWf *v1alpha1.FlyteWorkflow, isDynamic bool, err error) { + + cacheHitStopWatch := e.metrics.CacheHit.Start(ctx) + // Check if we have compiled the workflow before: + compiledFuturesFilePath, err := e.store.ConstructReference(ctx, nodeStatus.GetDataDir(), v1alpha1.GetCompiledFutureFile()) + if err != nil { + logger.Warnf(ctx, "Failed to construct data path for futures file. Error: %v", err) + return nil, false, errors.Wrapf(errors.CausedByError, node.GetID(), err, "Failed to construct data path for futures file.") + } + + // If there is a cached compiled Workflow, load and return it. + if metadata, err := e.store.Head(ctx, compiledFuturesFilePath); err != nil { + logger.Warnf(ctx, "Failed to call head on compiled futures file. Error: %v", err) + return nil, false, errors.Wrapf(errors.CausedByError, node.GetID(), err, "Failed to do HEAD on compiled futures file.") + } else if metadata.Exists() { + // It exists, load and return it + compiledWf, err = loadCachedFlyteWorkflow(ctx, e.store, compiledFuturesFilePath) + if err != nil { + logger.Warnf(ctx, "Failed to load cached flyte workflow from [%v], this will cause the dynamic workflow to be recompiled. Error: %v", + compiledFuturesFilePath, err) + e.metrics.CacheError.Inc(ctx) + } else { + cacheHitStopWatch.Stop() + return compiledWf, true, nil + } + } + + // If we have not build this spec before, build it now and cache it. + compiledWf, isDynamic, err = e.buildFlyteWorkflow(ctx, w, node, dataDir, nodeStatus) + if err != nil { + return compiledWf, isDynamic, err + } + + if !isDynamic { + return compiledWf, isDynamic, err + } + + // Cache the built WF. Errors are swallowed. + err = cacheFlyteWorkflow(ctx, e.store, compiledWf, compiledFuturesFilePath) + if err != nil { + logger.Warnf(ctx, "Failed to cache flyte workflow, this will cause a cache miss next time and cause the dynamic workflow to be recompiled. Error: %v", err) + e.metrics.CacheError.Inc(ctx) } - return newContextualWorkflow(w, subwf, nStatus, subwf.Tasks, subwf.SubWorkflows), nStatus, true, nil + return compiledWf, true, nil } func (e dynamicNodeHandler) progressDynamicWorkflow(ctx context.Context, parentNodeStatus v1alpha1.ExecutableNodeStatus, diff --git a/flytepropeller/pkg/controller/nodes/dynamic/utils.go b/flytepropeller/pkg/controller/nodes/dynamic/utils.go index 42c9e081c0..913d603751 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/utils.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/utils.go @@ -1,8 +1,11 @@ package dynamic import ( + "bytes" "context" + "encoding/json" + "github.com/lyft/flytestdlib/storage" "k8s.io/apimachinery/pkg/util/sets" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -103,3 +106,35 @@ func compileTasks(_ context.Context, tasks []*core.TaskTemplate) ([]*core.Compil return compiledTasks, nil } + +func cacheFlyteWorkflow(ctx context.Context, store storage.RawStore, wf *v1alpha1.FlyteWorkflow, target storage.DataReference) error { + raw, err := json.Marshal(wf) + if err != nil { + return err + } + + return store.WriteRaw(ctx, target, int64(len(raw)), storage.Options{}, bytes.NewReader(raw)) +} + +func loadCachedFlyteWorkflow(ctx context.Context, store storage.RawStore, source storage.DataReference) ( + *v1alpha1.FlyteWorkflow, error) { + + rawReader, err := store.ReadRaw(ctx, source) + if err != nil { + return nil, err + } + + buf := bytes.NewBuffer(nil) + _, err = buf.ReadFrom(rawReader) + if err != nil { + return nil, err + } + + err = rawReader.Close() + if err != nil { + return nil, err + } + + wf := &v1alpha1.FlyteWorkflow{} + return wf, json.Unmarshal(buf.Bytes(), wf) +} diff --git a/flytepropeller/pkg/controller/nodes/dynamic/utils_test.go b/flytepropeller/pkg/controller/nodes/dynamic/utils_test.go index e5df771a64..475b903514 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/utils_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/utils_test.go @@ -1,8 +1,13 @@ package dynamic import ( + "context" "testing" + "github.com/lyft/flytestdlib/promutils" + "github.com/lyft/flytestdlib/storage" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -75,3 +80,35 @@ func TestUnderlyingInterface(t *testing.T) { assert.NotNil(t, iface) assert.Equal(t, expectedIface, iface) } + +func createInmemoryStore(t testing.TB) *storage.DataStore { + cfg := storage.Config{ + Type: storage.TypeMemory, + } + + d, err := storage.NewDataStore(&cfg, promutils.NewTestScope()) + assert.NoError(t, err) + + return d +} + +func Test_cacheFlyteWorkflow(t *testing.T) { + store := createInmemoryStore(t) + expected := &v1alpha1.FlyteWorkflow{ + TypeMeta: v1.TypeMeta{}, + ObjectMeta: v1.ObjectMeta{}, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "abc", + Connections: v1alpha1.Connections{ + DownstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, + UpstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, + }, + }, + } + + location := storage.DataReference("somekey/file.json") + assert.NoError(t, cacheFlyteWorkflow(context.TODO(), store, expected, location)) + actual, err := loadCachedFlyteWorkflow(context.TODO(), store, location) + assert.NoError(t, err) + assert.Equal(t, expected, actual) +}