diff --git a/flytectl/cmd/compile/compile.go b/flytectl/cmd/compile/compile.go index 972c553e97..60235c1b91 100644 --- a/flytectl/cmd/compile/compile.go +++ b/flytectl/cmd/compile/compile.go @@ -80,21 +80,16 @@ func compileFromPackage(packagePath string) error { return err } - // compile workflows - for wfName, workflow := range workflows { + var providers []common.InterfaceProvider + var compiledWorkflows = map[string]*core.CompiledWorkflowClosure{} - fmt.Println("\nCompiling workflow:", wfName) - plan := plans[wfName] + // compile workflows + for _, workflow := range workflows { + providers, err = handleWorkflow(workflow, compiledTasks, compiledWorkflows, providers, plans, workflows) - _, err := compiler.CompileWorkflow(workflow.Template, - workflow.SubWorkflows, - compiledTasks, - []common.InterfaceProvider{compiler.NewLaunchPlanInterfaceProvider(*plan)}) if err != nil { - fmt.Println(":( Error Compiling workflow:", wfName) return err } - } fmt.Println("All Workflows compiled successfully!") @@ -105,6 +100,65 @@ func compileFromPackage(packagePath string) error { return nil } +func handleWorkflow( + workflow *admin.WorkflowSpec, + compiledTasks []*core.CompiledTask, + compiledWorkflows map[string]*core.CompiledWorkflowClosure, + compiledLaunchPlanProviders []common.InterfaceProvider, + plans map[string]*admin.LaunchPlan, + workflows map[string]*admin.WorkflowSpec) ([]common.InterfaceProvider, error) { + reqs, _ := compiler.GetRequirements(workflow.Template, workflow.SubWorkflows) + wfName := workflow.Template.Id.Name + + // Check if all the subworkflows referenced by launchplan are compiled + for i := range reqs.GetRequiredLaunchPlanIds() { + lpID := &reqs.GetRequiredLaunchPlanIds()[i] + lpWfName := plans[lpID.Name].Spec.WorkflowId.Name + missingWorkflow := workflows[lpWfName] + if compiledWorkflows[lpWfName] == nil { + // Recursively compile the missing workflow first + err := error(nil) + compiledLaunchPlanProviders, err = handleWorkflow(missingWorkflow, compiledTasks, compiledWorkflows, compiledLaunchPlanProviders, plans, workflows) + if err != nil { + return nil, err + } + } + } + + fmt.Println("\nCompiling workflow:", wfName) + + wf, err := compiler.CompileWorkflow(workflow.Template, + workflow.SubWorkflows, + compiledTasks, + compiledLaunchPlanProviders) + + if err != nil { + fmt.Println(":( Error Compiling workflow:", wfName) + return nil, err + } + compiledWorkflows[wfName] = wf + + // Update the expected inputs and outputs for the launchplans which reference this workflow + for _, plan := range plans { + if plan.Spec.WorkflowId.Name == wfName { + plan.Closure.ExpectedOutputs = wf.Primary.Template.Interface.Outputs + newMap := make(map[string]*core.Parameter) + + for key, value := range wf.Primary.Template.Interface.Inputs.Variables { + newMap[key] = &core.Parameter{ + Var: value, + } + } + plan.Closure.ExpectedInputs = &core.ParameterMap{ + Parameters: newMap, + } + compiledLaunchPlanProviders = append(compiledLaunchPlanProviders, compiler.NewLaunchPlanInterfaceProvider(*plan)) + } + } + + return compiledLaunchPlanProviders, nil +} + const ( compileShort = `Validate flyte packages without registration needed.` compileLong = ` diff --git a/flytectl/cmd/compile/compile_test.go b/flytectl/cmd/compile/compile_test.go index 4004ca897b..2d91260aff 100644 --- a/flytectl/cmd/compile/compile_test.go +++ b/flytectl/cmd/compile/compile_test.go @@ -43,6 +43,8 @@ func TestCompileCommand(t *testing.T) { assert.NotNil(t, err, "calling compile with Empty file flag does not error") } +// New packages can be created by using the following command +// pyflyte --pkgs package -f func TestCompilePackage(t *testing.T) { // valid package contains two workflows // with three tasks @@ -69,4 +71,7 @@ func TestCompilePackage(t *testing.T) { err = compileFromPackage("testdata/invalidworkflow.tgz") assert.NotNil(t, err, "unable to handle invalid workflow") + // testing workflows with launchplans used within workflow + err = compileFromPackage("testdata/launchplan-in-wf.tgz") + assert.Nil(t, err, "unable to compile workflow with launchplans used within workflow") } diff --git a/flytectl/cmd/compile/testdata/launchplan-in-wf.py b/flytectl/cmd/compile/testdata/launchplan-in-wf.py new file mode 100644 index 0000000000..8a54c69f28 --- /dev/null +++ b/flytectl/cmd/compile/testdata/launchplan-in-wf.py @@ -0,0 +1,16 @@ +# Tests that a LaunchPlan with inputs can be used in a workflow for flytectl compile +from flytekit import LaunchPlan, task, workflow + +@task +def my_task(num: int) -> int: + return num + 1 + + +@workflow +def inner_workflow(num: int) -> int: + return my_task(num=num) + + +@workflow +def outer_workflow() -> int: + return LaunchPlan.get_or_create(inner_workflow, "name_override", default_inputs={"num": 42})() diff --git a/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz b/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz new file mode 100644 index 0000000000..b297277ce0 Binary files /dev/null and b/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz differ