Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Not stripping structure from literal types #571

Merged
merged 2 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/compiler/transformers/k8s/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
func validateInputs(nodeID common.NodeID, iface *core.TypedInterface, inputs core.LiteralMap, errs errors.CompileErrors) (ok bool) {
if iface == nil {
errs.Collect(errors.NewValueRequiredErr(nodeID, "interface"))
return
return false
}

if iface.Inputs == nil {
errs.Collect(errors.NewValueRequiredErr(nodeID, "interface.InputsRef"))
return
return false
}

varMap := make(map[string]*core.Variable, len(iface.Inputs.Variables))
Expand Down
4 changes: 3 additions & 1 deletion pkg/compiler/transformers/k8s/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ func StripTypeMetadata(t *core.LiteralType) *core.LiteralType {

c := *t
c.Metadata = nil
c.Structure = nil
c.Annotation = nil
// Note that we cannot strip `Structure` from the type because the dynamic node output type is used to validate the
// interface of the dynamically compiled workflow. `Structure` is used to extend type checking information on
// differnent Flyte types and is therefore required to ensure correct type validation.

switch underlyingType := c.Type.(type) {
case *core.LiteralType_UnionType:
Expand Down
1 change: 1 addition & 0 deletions pkg/compiler/transformers/k8s/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func TestStripTypeMetadata(t *testing.T) {
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_STRING,
},
Structure: &core.TypeStructure{Tag: "str"},
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas
if n := wf.Template.GetFailureNode(); n != nil {
nodes, ok := buildNodeSpec(n, tasks, errs.NewScope())
if !ok {
return
return nil, errs
}
failureN = nodes[0]
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/compiler/validators/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ func validateBranchInterface(w c.WorkflowBuilder, node c.NodeBuilder, errs error

if branch := node.GetBranchNode(); branch == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch"))
return
return nil, false
}

if ifBlock := node.GetBranchNode().IfElse; ifBlock == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch.IfElse"))
return
return nil, false
}

if ifCase := node.GetBranchNode().IfElse.Case; ifCase == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch.IfElse.Case"))
return
return nil, false
}

if thenNode := node.GetBranchNode().IfElse.Case.ThenNode; thenNode == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch.IfElse.Case.ThenNode"))
return
return nil, false
}

var outputs map[string]*flyte.Variable
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "unable to read futures file, maybe corrupted")
}

closure, dynamicWf, workflowContext, err := d.buildDynamicWorkflow(ctx, nCtx, djSpec, dynamicNodeStatus)
closure, dynamicWf, err := d.buildDynamicWorkflow(ctx, nCtx, djSpec, dynamicNodeStatus)
if err != nil {
return workflowContext, err
return dynamicWorkflowContext{}, err
}

if err := f.Cache(ctx, dynamicWf, closure); err != nil {
Expand All @@ -222,28 +222,28 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
}

func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext,
djSpec *core.DynamicJobSpec, dynamicNodeStatus v1alpha1.ExecutableNodeStatus) (*core.CompiledWorkflowClosure, *v1alpha1.FlyteWorkflow, dynamicWorkflowContext, error) {
djSpec *core.DynamicJobSpec, dynamicNodeStatus v1alpha1.ExecutableNodeStatus) (*core.CompiledWorkflowClosure, *v1alpha1.FlyteWorkflow, error) {
wf, err := d.buildDynamicWorkflowTemplate(ctx, djSpec, nCtx, dynamicNodeStatus)
if err != nil {
return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template")
return nil, nil, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template")
}

compiledTasks, err := compileTasks(ctx, djSpec.Tasks)
if err != nil {
return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to compile dynamic tasks")
return nil, nil, errors.Wrapf(utils.ErrorCodeUser, err, "failed to compile dynamic tasks")
}

// Get the requirements, that is, a list of all the task IDs and the launch plan IDs that will be called as part of this dynamic task.
// The definition of these will need to be fetched from Admin (in order to get the interface).
requirements, err := compiler.GetRequirements(wf, djSpec.Subworkflows)
if err != nil {
return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to Get requirements for subworkflows")
return nil, nil, errors.Wrapf(utils.ErrorCodeUser, err, "failed to Get requirements for subworkflows")
}

// This method handles user vs system errors internally
launchPlanInterfaces, err := d.getLaunchPlanInterfaces(ctx, requirements.GetRequiredLaunchPlanIds())
if err != nil {
return nil, nil, dynamicWorkflowContext{}, err
return nil, nil, err
}

// TODO: In addition to querying Admin for launch plans, we also need to get all the tasks that are missing from the dynamic job spec.
Expand All @@ -253,15 +253,15 @@ func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflow(ctx context.Context, nC
var closure *core.CompiledWorkflowClosure
closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, launchPlanInterfaces)
if err != nil {
return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow")
return nil, nil, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow")
}

dynamicWf, err := k8s.BuildFlyteWorkflow(closure, &core.LiteralMap{}, nil, "")
if err != nil {
return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow")
return nil, nil, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow")
}

return closure, dynamicWf, dynamicWorkflowContext{}, nil
return closure, dynamicWf, nil
}

func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, execContext executors.ExecutionContext, dynamicWorkflow v1alpha1.ExecutableWorkflow, nl executors.NodeLookup,
Expand Down