Skip to content

Commit

Permalink
[Compiler] Add execution edges only for top level nodes (flyteorg#216)
Browse files Browse the repository at this point in the history
* Fix branch compiler

* cleanup

* fix tests

* clean up

* lint

* docs

* cleanup

* revert config change

* avoid recompiling subworkflows

* Also limit execution edges from start node

* PR Comments

* revert config.yaml

* Use a smaller test file
  • Loading branch information
EngHabu authored Jan 14, 2021
1 parent 7059e10 commit 754079c
Show file tree
Hide file tree
Showing 10 changed files with 993 additions and 30 deletions.
16 changes: 11 additions & 5 deletions pkg/compiler/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ type workflowBuilder struct {
// These are references to all subgraphs and tasks passed to CompileWorkflow. They will be passed around but will
// not show in their entirety in the final Graph. The required subset of these will be added to each subgraph as
// the compile traverses them.
allLaunchPlans map[string]c.InterfaceProvider
allTasks c.TaskIndex
allSubWorkflows c.WorkflowIndex
allLaunchPlans map[string]c.InterfaceProvider
allTasks c.TaskIndex
allSubWorkflows c.WorkflowIndex
allCompiledSubWorkflows c.WorkflowIndex
}

func (w workflowBuilder) GetFailureNode() c.Node {
Expand Down Expand Up @@ -70,8 +71,13 @@ func (w workflowBuilder) GetLaunchPlan(id c.LaunchPlanID) (wf c.InterfaceProvide
return
}

func (w workflowBuilder) UpdateSubWorkflow(id c.WorkflowID, compiledWorkflow *core.CompiledWorkflow) {
w.allSubWorkflows[id.String()] = compiledWorkflow
func (w workflowBuilder) StoreCompiledSubWorkflow(id c.WorkflowID, compiledWorkflow *core.CompiledWorkflow) {
w.allCompiledSubWorkflows[id.String()] = compiledWorkflow
}

func (w workflowBuilder) GetCompiledSubWorkflow(id c.WorkflowID) (wf *core.CompiledWorkflow, found bool) {
wf, found = w.allCompiledSubWorkflows[id.String()]
return
}

func (w workflowBuilder) GetSubWorkflow(id c.WorkflowID) (wf *core.CompiledWorkflow, found bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compiler/common/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
// A mutable workflow used during the build of the intermediate layer.
type WorkflowBuilder interface {
Workflow
UpdateSubWorkflow(id WorkflowID, compiledWorkflow *core.CompiledWorkflow)
StoreCompiledSubWorkflow(id WorkflowID, compiledWorkflow *core.CompiledWorkflow)
AddExecutionEdge(nodeFrom, nodeTo NodeID)
AddNode(n NodeBuilder, errs errors.CompileErrors) (node NodeBuilder, ok bool)
ValidateWorkflow(fg *core.CompiledWorkflow, errs errors.CompileErrors) (Workflow, bool)
Expand Down
41 changes: 41 additions & 0 deletions pkg/compiler/common/mocks/workflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 43 additions & 2 deletions pkg/compiler/common/mocks/workflow_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/compiler/common/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Workflow interface {
GetTask(id TaskID) (task Task, found bool)
GetLaunchPlan(id LaunchPlanID) (wf InterfaceProvider, found bool)
GetSubWorkflow(id WorkflowID) (wf *core.CompiledWorkflow, found bool)
GetCompiledSubWorkflow(id WorkflowID) (wf *core.CompiledWorkflow, found bool)
GetCoreWorkflow() *core.CompiledWorkflow
GetFailureNode() Node
GetNodes() NodeIndex
Expand Down
82 changes: 81 additions & 1 deletion pkg/compiler/test/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,85 @@ func marshalProto(t *testing.T, filename string, p proto.Message) {
assert.NoError(t, ioutil.WriteFile(strings.Replace(filename, filepath.Ext(filename), ".yaml", 1), b, os.ModePerm))
}

func TestDynamic(t *testing.T) {
errors.SetConfig(errors.Config{IncludeSource: true})
assert.NoError(t, filepath.Walk("testdata/dynamic", func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}

t.Run(path, func(t *testing.T) {
// If you want to debug a single use-case. Uncomment this line.
//if !strings.HasSuffix(path, "success_1.json") {
// t.SkipNow()
//}

raw, err := ioutil.ReadFile(path)
assert.NoError(t, err)
wf := &core.DynamicJobSpec{}
err = jsonpb.UnmarshalString(string(raw), wf)
if !assert.NoError(t, err) {
t.FailNow()
}

t.Log("Compiling Workflow")
compiledTasks := mustCompileTasks(t, wf.Tasks)
wfTemplate := &core.WorkflowTemplate{
Id: &core.Identifier{
Domain: "domain",
Name: "name",
Version: "version",
},
Interface: &core.TypedInterface{
Inputs: &core.VariableMap{Variables: map[string]*core.Variable{}},
Outputs: &core.VariableMap{Variables: map[string]*core.Variable{
"o0": {
Type: &core.LiteralType{
Type: &core.LiteralType_CollectionType{
CollectionType: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_INTEGER,
},
},
},
},
},
}},
},
Nodes: wf.Nodes,
Outputs: wf.Outputs,
}
compiledWfc, err := compiler.CompileWorkflow(wfTemplate, wf.Subworkflows, compiledTasks,
[]common.InterfaceProvider{})
if !assert.NoError(t, err) {
t.FailNow()
}

inputs := map[string]interface{}{}
for varName, v := range compiledWfc.Primary.Template.Interface.Inputs.Variables {
inputs[varName] = utils.MustMakeDefaultLiteralForType(v.Type)
}

flyteWf, err := k8s.BuildFlyteWorkflow(compiledWfc,
utils.MustMakeLiteral(inputs).GetMap(),
&core.WorkflowExecutionIdentifier{
Project: "hello",
Domain: "domain",
Name: "name",
},
"namespace")
if assert.NoError(t, err) {
raw, err := json.Marshal(flyteWf)
if assert.NoError(t, err) {
assert.NotEmpty(t, raw)
}
}
})

return nil
}))
}

func TestBranches(t *testing.T) {
errors.SetConfig(errors.Config{IncludeSource: true})
assert.NoError(t, filepath.Walk("testdata/branch", func(path string, info os.FileInfo, err error) error {
Expand All @@ -109,7 +188,8 @@ func TestBranches(t *testing.T) {
}

t.Run(path, func(t *testing.T) {
//if !strings.HasSuffix(path, "success_6.json") {
// If you want to debug a single use-case. Uncomment this line.
//if !strings.HasSuffix(path, "success_1.json") {
// t.SkipNow()
//}

Expand Down
Loading

0 comments on commit 754079c

Please sign in to comment.