Skip to content

Commit

Permalink
Make default RawOutputDataConfig overridable (flyteorg#371)
Browse files Browse the repository at this point in the history
* Make default RawOutputDataConfig overridable

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* update single task execution

Signed-off-by: Kevin Su <[email protected]>

* Updated tests

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Mar 15, 2022
1 parent 09db4d3 commit 1836fb5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/benbjohnson/clock v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.22.2
github.com/flyteorg/flyteidl v0.23.1
github.com/flyteorg/flyteplugins v0.9.1
github.com/flyteorg/flytepropeller v0.16.14
github.com/flyteorg/flytestdlib v0.4.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.18/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.22.2 h1:cH1amuHV2AjUAJ7RuQOzrgeeRGEzhNV8Is3kTAIPS4U=
github.com/flyteorg/flyteidl v0.22.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.23.1 h1:gSF/7VEhsClN3aCPj0lqCTAOpzqZD5rbFx6IUAtY2pg=
github.com/flyteorg/flyteidl v0.23.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.9.1 h1:Z0gxSvG7LeI+COfEmuzkhz9RnJ4E5wWUcjj5qh1uKuw=
github.com/flyteorg/flyteplugins v0.9.1/go.mod h1:OEGQztPFDJG4DV7tS9lDsRRd521iUINn5dcsBf6bW5k=
github.com/flyteorg/flytepropeller v0.16.14 h1:zG+UnfZLPCQdwh7ORm3BNwXlb6Sp2Wwa7I7NnZYcPDw=
Expand Down
13 changes: 11 additions & 2 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,11 @@ func (m *ExecutionManager) launchSingleTaskExecution(
annotations = requestSpec.Annotations.Values
}

rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig
if requestSpec.RawOutputDataConfig != nil {
rawOutputDataConfig = requestSpec.RawOutputDataConfig
}

resolvedAuthRole := resolveAuthRole(request, launchPlan)
resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole)
executionParameters := workflowengineInterfaces.ExecutionParameters{
Expand All @@ -579,7 +584,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: launchPlan.Spec.RawOutputDataConfig,
RawOutputDataConfig: rawOutputDataConfig,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "")
Expand Down Expand Up @@ -794,6 +799,10 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if err != nil {
return nil, nil, err
}
rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig
if requestSpec.RawOutputDataConfig != nil {
rawOutputDataConfig = requestSpec.RawOutputDataConfig
}

resolvedAuthRole := resolveAuthRole(request, launchPlan)
resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole)
Expand All @@ -807,7 +816,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: launchPlan.Spec.RawOutputDataConfig,
RawOutputDataConfig: rawOutputDataConfig,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
Expand Down
6 changes: 5 additions & 1 deletion pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func setDefaultLpCallbackForExecTest(repository interfaces.Repository) {
"annotation4": "4",
},
}

lpSpecBytes, _ := proto.Marshal(&lpSpec)
lpClosure := admin.LaunchPlanClosure{
ExpectedInputs: lpSpec.DefaultInputs,
Expand Down Expand Up @@ -260,12 +261,14 @@ func TestCreateExecution(t *testing.T) {
}

principal := "principal"
rawOutput := "raw_output"
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, principal, spec.Metadata.Principal)
assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix)
return nil
})
setDefaultLpCallbackForExecTest(repository)
Expand Down Expand Up @@ -334,6 +337,7 @@ func TestCreateExecution(t *testing.T) {
request.Spec.Metadata = &admin.ExecutionMetadata{
Principal: "unused - populated from authenticated context",
}
request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput}

identity := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil)
ctx := identity.WithContext(context.Background())
Expand Down Expand Up @@ -406,7 +410,6 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, admin.ExecutionMetadata_CHILD_WORKFLOW, spec.Metadata.Mode)
assert.Equal(t, "feeny", spec.Metadata.Principal)
assert.True(t, proto.Equal(&parentNodeExecutionID, spec.Metadata.ParentNodeExecution))
assert.EqualValues(t, input.ParentNodeExecutionID, 1)
assert.EqualValues(t, input.SourceExecutionID, 2)
Expand Down Expand Up @@ -2925,6 +2928,7 @@ func TestRelaunchExecution_LegacyModel(t *testing.T) {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.Nil(t, err)
assert.Equal(t, "default_raw_output", spec.RawOutputDataConfig.OutputLocationPrefix)
assert.Equal(t, admin.ExecutionMetadata_RELAUNCH, spec.Metadata.Mode)
assert.Equal(t, int32(admin.ExecutionMetadata_RELAUNCH), input.Mode)
assert.True(t, proto.Equal(spec.Inputs, getLegacySpec().Inputs))
Expand Down
1 change: 1 addition & 0 deletions pkg/manager/impl/testutils/mock_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func GetExecutionRequest() admin.ExecutionCreateRequest {
},
},
},
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: "default_raw_output"},
},
Inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down

0 comments on commit 1836fb5

Please sign in to comment.