diff --git a/flytectl/cmd/compile/compile.go b/flytectl/cmd/compile/compile.go index f64991f941b..16d1fcf9a72 100644 --- a/flytectl/cmd/compile/compile.go +++ b/flytectl/cmd/compile/compile.go @@ -81,24 +81,15 @@ func compileFromPackage(packagePath string) error { } var providers []common.InterfaceProvider - for _, plan := range plans { - providers = append(providers, compiler.NewLaunchPlanInterfaceProvider(*plan)) - } + var compiledWorkflows = map[string]*core.CompiledWorkflowClosure{} // compile workflows - for wfName, workflow := range workflows { - - fmt.Println("\nCompiling workflow:", wfName) + for _, workflow := range workflows { + providers, err = handleWorkflow(workflow, compiledTasks, compiledWorkflows, providers, plans, workflows) - _, err := compiler.CompileWorkflow(workflow.Template, - workflow.SubWorkflows, - compiledTasks, - providers) if err != nil { - fmt.Println(":( Error Compiling workflow:", wfName) return err } - } fmt.Println("All Workflows compiled successfully!") @@ -109,6 +100,64 @@ func compileFromPackage(packagePath string) error { return nil } +func handleWorkflow( + workflow *admin.WorkflowSpec, + compiledTasks []*core.CompiledTask, + compiledWorkflows map[string]*core.CompiledWorkflowClosure, + compiledLaunchPlanProviders []common.InterfaceProvider, + plans map[string]*admin.LaunchPlan, + workflows map[string]*admin.WorkflowSpec) ([]common.InterfaceProvider, error) { + reqs, _ := compiler.GetRequirements(workflow.Template, workflow.SubWorkflows) + wfName := workflow.Template.Id.Name + + // Check if all the subworkflows referenced by launchplan are compiled + for _, lpId := range reqs.GetRequiredLaunchPlanIds() { + lpWfName := plans[lpId.Name].Spec.WorkflowId.Name + missingWorkflow := workflows[lpWfName] + if compiledWorkflows[lpWfName] == nil { + // Recursively compile the missing workflow first + err := error(nil) + compiledLaunchPlanProviders, err = handleWorkflow(missingWorkflow, compiledTasks, compiledWorkflows, compiledLaunchPlanProviders, plans, workflows) + if err != nil { + return nil, err + } + } + } + + fmt.Println("\nCompiling workflow:", wfName) + + wf, err := compiler.CompileWorkflow(workflow.Template, + workflow.SubWorkflows, + compiledTasks, + compiledLaunchPlanProviders) + + if err != nil { + fmt.Println(":( Error Compiling workflow:", wfName) + return nil, err + } + compiledWorkflows[wfName] = wf + + // Update the expected inputs and outputs for the launchplans which reference this workflow + for _, plan := range plans { + if plan.Spec.WorkflowId.Name == wfName { + plan.Closure.ExpectedOutputs = wf.Primary.Template.Interface.Outputs + newMap := make(map[string]*core.Parameter) + + for key, value := range wf.Primary.Template.Interface.Inputs.Variables { + newMap[key] = &core.Parameter{ + Var: value, + } + } + plan.Closure.ExpectedInputs = &core.ParameterMap{ + Parameters: newMap, + } + compiledLaunchPlanProviders = append(compiledLaunchPlanProviders, compiler.NewLaunchPlanInterfaceProvider(*plan)) + } + } + + return compiledLaunchPlanProviders, nil +} + const ( compileShort = `Validate flyte packages without registration needed.` compileLong = ` diff --git a/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz b/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz index 1a78ceaf22a..b297277ce0b 100644 Binary files a/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz and b/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz differ