Skip to content

Commit

Permalink
Adding CheckAndFetchInputsForExecution on single task executions (fly…
Browse files Browse the repository at this point in the history
…teorg#495)

* adding CheckAndFetchInputsForExecution on single task executions

Signed-off-by: Dan Rammer <[email protected]>

* fixed unit test

Signed-off-by: Dan Rammer <[email protected]>

* fixed typo

Signed-off-by: Dan Rammer <[email protected]>

Signed-off-by: Dan Rammer <[email protected]>
  • Loading branch information
hamersaw authored Dec 13, 2022
1 parent a1804d5 commit 905e7ee
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 37 deletions.
14 changes: 13 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,18 @@ func (m *ExecutionManager) launchSingleTaskExecution(
return nil, nil, err
}

executionInputs, err := validation.CheckAndFetchInputsForExecution(
request.Inputs,
launchPlan.Spec.FixedInputs,
launchPlan.Closure.ExpectedInputs,
)
if err != nil {
logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+
"fixed inputs: %+v and expected inputs: %+v with err %v",
request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err)
return nil, nil, err
}

name := util.GetExecutionName(request)
workflowExecutionID := core.WorkflowExecutionIdentifier{
Project: request.Project,
Expand Down Expand Up @@ -647,7 +659,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
}

executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: request.Inputs,
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
Expand Down
56 changes: 20 additions & 36 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4217,6 +4217,18 @@ func TestCreateSingleTaskExecution(t *testing.T) {
return nil
})

var launchplan *models.LaunchPlan
repository.LaunchPlanRepo().(*repositoryMocks.MockLaunchPlanRepo).SetCreateCallback(func(input models.LaunchPlan) error {
launchplan = &input
return nil
})
repository.LaunchPlanRepo().(*repositoryMocks.MockLaunchPlanRepo).SetGetCallback(func(input interfaces.Identifier) (models.LaunchPlan, error) {
if launchplan == nil {
return models.LaunchPlan{}, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "launchplan not found")
}
return *launchplan, nil
})

mockStorage := getMockStorageForExecTest(context.Background())
workflowManager := NewWorkflowManager(
repository,
Expand Down Expand Up @@ -4255,45 +4267,17 @@ func TestCreateSingleTaskExecution(t *testing.T) {
},
},
}

marshaller := jsonpb.Marshaler{}
stringReq, ferr := marshaller.MarshalToString(&request)
_, ferr := marshaller.MarshalToString(&request)
assert.NoError(t, ferr)
println(fmt.Sprintf("req: %+v", stringReq))
_, err := execManager.CreateExecution(context.TODO(), admin.ExecutionCreateRequest{
Project: "flytekit",
Domain: "production",
Name: "singletaskexec",
Spec: &admin.ExecutionSpec{
LaunchPlan: &core.Identifier{
Project: "flytekit",
Domain: "production",
Name: "simple_task",
Version: "12345",
ResourceType: core.ResourceType_TASK,
},
AuthRole: &admin.AuthRole{
KubernetesServiceAccount: "foo",
},
},
Inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"a": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_Integer{
Integer: 999,
},
},
},
},
},
},
},
},
}, time.Now())

// test once to create an initial launchplan
_, err := execManager.CreateExecution(context.TODO(), request, time.Now())
assert.NoError(t, err)

// test again to ensure existing launchplan retrieval works
_, err = execManager.CreateExecution(context.TODO(), request, time.Now())
assert.NoError(t, err)
}

Expand Down

0 comments on commit 905e7ee

Please sign in to comment.