From 67ca4a3a564079620cba3bc39709438dfd278ebd Mon Sep 17 00:00:00 2001 From: Dennis Keck <26092524+fellhorn@users.noreply.github.com> Date: Mon, 10 Jun 2024 11:30:25 +0200 Subject: [PATCH] Handle inputs in subworkflows Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> --- flytectl/cmd/compile/compile.go | 74 +++++++++++++++--- .../cmd/compile/testdata/launchplan-in-wf.tgz | Bin 777 -> 875 bytes 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/flytectl/cmd/compile/compile.go b/flytectl/cmd/compile/compile.go index f64991f941b..60235c1b91e 100644 --- a/flytectl/cmd/compile/compile.go +++ b/flytectl/cmd/compile/compile.go @@ -81,24 +81,15 @@ func compileFromPackage(packagePath string) error { } var providers []common.InterfaceProvider - for _, plan := range plans { - providers = append(providers, compiler.NewLaunchPlanInterfaceProvider(*plan)) - } + var compiledWorkflows = map[string]*core.CompiledWorkflowClosure{} // compile workflows - for wfName, workflow := range workflows { - - fmt.Println("\nCompiling workflow:", wfName) + for _, workflow := range workflows { + providers, err = handleWorkflow(workflow, compiledTasks, compiledWorkflows, providers, plans, workflows) - _, err := compiler.CompileWorkflow(workflow.Template, - workflow.SubWorkflows, - compiledTasks, - providers) if err != nil { - fmt.Println(":( Error Compiling workflow:", wfName) return err } - } fmt.Println("All Workflows compiled successfully!") @@ -109,6 +100,65 @@ func compileFromPackage(packagePath string) error { return nil } +func handleWorkflow( + workflow *admin.WorkflowSpec, + compiledTasks []*core.CompiledTask, + compiledWorkflows map[string]*core.CompiledWorkflowClosure, + compiledLaunchPlanProviders []common.InterfaceProvider, + plans map[string]*admin.LaunchPlan, + workflows map[string]*admin.WorkflowSpec) ([]common.InterfaceProvider, error) { + reqs, _ := compiler.GetRequirements(workflow.Template, workflow.SubWorkflows) + wfName := workflow.Template.Id.Name + + // Check if all the subworkflows referenced by launchplan are compiled + for i := range reqs.GetRequiredLaunchPlanIds() { + lpID := &reqs.GetRequiredLaunchPlanIds()[i] + lpWfName := plans[lpID.Name].Spec.WorkflowId.Name + missingWorkflow := workflows[lpWfName] + if compiledWorkflows[lpWfName] == nil { + // Recursively compile the missing workflow first + err := error(nil) + compiledLaunchPlanProviders, err = handleWorkflow(missingWorkflow, compiledTasks, compiledWorkflows, compiledLaunchPlanProviders, plans, workflows) + if err != nil { + return nil, err + } + } + } + + fmt.Println("\nCompiling workflow:", wfName) + + wf, err := compiler.CompileWorkflow(workflow.Template, + workflow.SubWorkflows, + compiledTasks, + compiledLaunchPlanProviders) + + if err != nil { + fmt.Println(":( Error Compiling workflow:", wfName) + return nil, err + } + compiledWorkflows[wfName] = wf + + // Update the expected inputs and outputs for the launchplans which reference this workflow + for _, plan := range plans { + if plan.Spec.WorkflowId.Name == wfName { + plan.Closure.ExpectedOutputs = wf.Primary.Template.Interface.Outputs + newMap := make(map[string]*core.Parameter) + + for key, value := range wf.Primary.Template.Interface.Inputs.Variables { + newMap[key] = &core.Parameter{ + Var: value, + } + } + plan.Closure.ExpectedInputs = &core.ParameterMap{ + Parameters: newMap, + } + compiledLaunchPlanProviders = append(compiledLaunchPlanProviders, compiler.NewLaunchPlanInterfaceProvider(*plan)) + } + } + + return compiledLaunchPlanProviders, nil +} + const ( compileShort = `Validate flyte packages without registration needed.` compileLong = ` diff --git a/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz b/flytectl/cmd/compile/testdata/launchplan-in-wf.tgz index 1a78ceaf22acedd4f12d0dccec9e08b50cbb3aa4..b297277ce0b807eb705c203a0fcd6039e6072e4d 100644 GIT binary patch literal 875 zcmV-x1C;z9iwFq$#%5*$|7L7?bY(4YVPk7yXJsyQXLxo6{H z=8ny4ICHO|NA&39nbZHwof;3&qoBmxbyO5hyJ-*mnW8d=4*Mttx}DGo#O2ZRM70iE zq1)o`n8n^IjF~#4n#xSm!U*{P(e0ieH0n*i*P*>S^}8Vt#$C2b+g~4M{hOvfJTl8r zlE(;DH4)QIjTyS8Sxl^di&+*rRMK4kHST$C=kB=s#-AwoX9geSv!%T`Ujqq|0!Gpn z-oRO3k@7g3&q$(GdXKZ7A{Fq4>v@UzGExv($2qUtl9ut32rpN5@OC~|TKAk5xBUwq z1a6I&x1#f5l8??K+7(2WNVb@Rb(t0v{15&I|36zad*UbA)*i(8f9HRu8&m$*RRjE= zj{j%1EPc6UM=)u4u_M^WBsTmGUX3;cN!^!_iYn+wM(A9G!YTv*ga4n9|Ct5;eW`M<$t@_!xtpN{{3WfrmhYb@cd*!KzB&sGFJM49{=UTF>N(CIYFh-}2uR>+HJ zlDdzJ`An%WalPwk2|-B!{15(z{2xdOV{ZLlOU3_8wWj^ADh&LeivJHp{y(?)PZzmA zfFR=s|AYS_{|BBHihWxmX0wO94RD|TEo(aeHw?oB|EJ>r-?=&Y_*#1qp8K9Wd_Z<3 zNo4uUj502fcia$-&6xyZw^9E{ovbC)EYM$Zdhp#G>V35PW$_qvq8W*_36m9T9E09q_u_C);8$?m3G(L`+eZ1E)8Nt2v=F z3NDIZF?1S@oQQ$(oNZaIp7Uy>?Yo{hw-cU|JrR7fHK&O8LKG%+&u49sK`~^8cyg|H&fv z2X=8G-T|y9{tZyfKve($0000000000000000000000000007{z`2$XxUKs#T006ft Bt-$~Q literal 777 zcmV+k1NQtMiwFqb!dYel|7L7?bY(4YVPk7yXJsyQXLyD^x5~U7pg^+gM&)P4Wo6^!!x}k|>1Kx*2-0%C` z0*&)Hc6fzX-kmnCdd-?W zW(eUBQFU$CsS3xb>+xT!S1qz<^19#U?k!&(2^dHUb67W{A3;s0Fxe=;Ze`LjS~|SA^?ZP?(!x#=BMbhA|KWe|e`9&`S8z|< zq4;m<|N6rHf7KB#{GW^ee-v*N`FcE5r%}j8FX*jY3Ih!2V--^cD zM#5X2p^`jGzE-lNv}M)QM@IEyfzBveAyCAk^>e zt5f<)7hlDW=RZT&>-RjH=|p#R{V4Wf8k$xc`ZtojsB