Skip to content

Commit

Permalink
Fix: Make 'flytectl compile' consider launchplans used within workflo…
Browse files Browse the repository at this point in the history
…ws (#5463)

* Fix: Make 'flytectl compile' consider launchplans used within workflows

Signed-off-by: Dennis Keck <[email protected]>

* Add raw file for test

Signed-off-by: Dennis Keck <[email protected]>

* Add documentation on how to create a package

Signed-off-by: Dennis Keck <[email protected]>

---------

Signed-off-by: Dennis Keck <[email protected]>
  • Loading branch information
fellhorn authored Jun 17, 2024
1 parent 1abdd94 commit 791471c
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 10 deletions.
74 changes: 64 additions & 10 deletions flytectl/cmd/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,16 @@ func compileFromPackage(packagePath string) error {
return err
}

// compile workflows
for wfName, workflow := range workflows {
var providers []common.InterfaceProvider
var compiledWorkflows = map[string]*core.CompiledWorkflowClosure{}

fmt.Println("\nCompiling workflow:", wfName)
plan := plans[wfName]
// compile workflows
for _, workflow := range workflows {
providers, err = handleWorkflow(workflow, compiledTasks, compiledWorkflows, providers, plans, workflows)

_, err := compiler.CompileWorkflow(workflow.Template,
workflow.SubWorkflows,
compiledTasks,
[]common.InterfaceProvider{compiler.NewLaunchPlanInterfaceProvider(*plan)})
if err != nil {
fmt.Println(":( Error Compiling workflow:", wfName)
return err
}

}

fmt.Println("All Workflows compiled successfully!")
Expand All @@ -105,6 +100,65 @@ func compileFromPackage(packagePath string) error {
return nil
}

func handleWorkflow(
workflow *admin.WorkflowSpec,
compiledTasks []*core.CompiledTask,
compiledWorkflows map[string]*core.CompiledWorkflowClosure,
compiledLaunchPlanProviders []common.InterfaceProvider,
plans map[string]*admin.LaunchPlan,
workflows map[string]*admin.WorkflowSpec) ([]common.InterfaceProvider, error) {
reqs, _ := compiler.GetRequirements(workflow.Template, workflow.SubWorkflows)
wfName := workflow.Template.Id.Name

// Check if all the subworkflows referenced by launchplan are compiled
for i := range reqs.GetRequiredLaunchPlanIds() {
lpID := &reqs.GetRequiredLaunchPlanIds()[i]
lpWfName := plans[lpID.Name].Spec.WorkflowId.Name
missingWorkflow := workflows[lpWfName]
if compiledWorkflows[lpWfName] == nil {
// Recursively compile the missing workflow first
err := error(nil)
compiledLaunchPlanProviders, err = handleWorkflow(missingWorkflow, compiledTasks, compiledWorkflows, compiledLaunchPlanProviders, plans, workflows)
if err != nil {
return nil, err
}
}
}

fmt.Println("\nCompiling workflow:", wfName)

wf, err := compiler.CompileWorkflow(workflow.Template,
workflow.SubWorkflows,
compiledTasks,
compiledLaunchPlanProviders)

if err != nil {
fmt.Println(":( Error Compiling workflow:", wfName)
return nil, err
}
compiledWorkflows[wfName] = wf

// Update the expected inputs and outputs for the launchplans which reference this workflow
for _, plan := range plans {
if plan.Spec.WorkflowId.Name == wfName {
plan.Closure.ExpectedOutputs = wf.Primary.Template.Interface.Outputs
newMap := make(map[string]*core.Parameter)

for key, value := range wf.Primary.Template.Interface.Inputs.Variables {
newMap[key] = &core.Parameter{
Var: value,
}
}
plan.Closure.ExpectedInputs = &core.ParameterMap{
Parameters: newMap,
}
compiledLaunchPlanProviders = append(compiledLaunchPlanProviders, compiler.NewLaunchPlanInterfaceProvider(*plan))
}
}

return compiledLaunchPlanProviders, nil
}

const (
compileShort = `Validate flyte packages without registration needed.`
compileLong = `
Expand Down
5 changes: 5 additions & 0 deletions flytectl/cmd/compile/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestCompileCommand(t *testing.T) {
assert.NotNil(t, err, "calling compile with Empty file flag does not error")
}

// New packages can be created by using the following command
// pyflyte --pkgs <module> package -f
func TestCompilePackage(t *testing.T) {
// valid package contains two workflows
// with three tasks
Expand All @@ -69,4 +71,7 @@ func TestCompilePackage(t *testing.T) {
err = compileFromPackage("testdata/invalidworkflow.tgz")
assert.NotNil(t, err, "unable to handle invalid workflow")

// testing workflows with launchplans used within workflow
err = compileFromPackage("testdata/launchplan-in-wf.tgz")
assert.Nil(t, err, "unable to compile workflow with launchplans used within workflow")
}
16 changes: 16 additions & 0 deletions flytectl/cmd/compile/testdata/launchplan-in-wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Tests that a LaunchPlan with inputs can be used in a workflow for flytectl compile
from flytekit import LaunchPlan, task, workflow

@task
def my_task(num: int) -> int:
return num + 1


@workflow
def inner_workflow(num: int) -> int:
return my_task(num=num)


@workflow
def outer_workflow() -> int:
return LaunchPlan.get_or_create(inner_workflow, "name_override", default_inputs={"num": 42})()
Binary file added flytectl/cmd/compile/testdata/launchplan-in-wf.tgz
Binary file not shown.

0 comments on commit 791471c

Please sign in to comment.