Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Save execution namespace in system metadata #568

Merged
merged 4 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.5.5
github.com/flyteorg/flyteidl v1.5.7
github.com/flyteorg/flyteplugins v1.0.56
github.com/flyteorg/flytepropeller v1.1.87
github.com/flyteorg/flytestdlib v1.0.15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.5 h1:tNNhuXPog4atAMSGE2kyAg6JzYy1TvjqrrQeh1EZVHs=
github.com/flyteorg/flyteidl v1.5.5/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.7 h1:voAxMMFsKOseNFSlCyRGlpegqtQXtJjyxgsQzZg4tts=
github.com/flyteorg/flyteidl v1.5.7/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw=
github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU=
github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM=
Expand Down
22 changes: 14 additions & 8 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
}
sourceExecutionID = sourceExecutionModel.ID
requestSpec.Metadata.Principal = sourceExecutionModel.User
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
sourceExecution, err := transformers.FromExecutionModel(ctx, *sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
Expand Down Expand Up @@ -650,6 +650,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: taskIdentifier.ResourceType,
Namespace: namespace,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also make a backward compat change for older executions which dont have this namespace metadata set that it returns the project-domain which is widely used namespace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should respect the config if we're going to assume values rather than use the default mapping

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that works. wanted to make sure we have some data that we have valid data when we call the GET api even though the model wont have saved this when creating the execution

})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -905,6 +906,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
Namespace: namespace,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -965,7 +967,7 @@ func (m *ExecutionManager) RelaunchExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1022,7 +1024,7 @@ func (m *ExecutionManager) RecoverExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1073,7 +1075,7 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
return
}
// Find the reference launch plan to get the kickoff time argument
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(context.Background(),
"failed to transform execution model when emitting scheduled workflow execution stats with for "+
Expand Down Expand Up @@ -1316,7 +1318,11 @@ func (m *ExecutionManager) GetExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.GetId().GetProject(), request.GetId().GetDomain())
execution, transformerErr := transformers.FromExecutionModel(ctx, *executionModel, &transformers.ExecutionTransformerOptions{
DefaultNamespace: namespace,
})
if transformerErr != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id,
transformerErr)
Expand Down Expand Up @@ -1359,7 +1365,7 @@ func (m *ExecutionManager) GetExecutionData(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -1459,7 +1465,7 @@ func (m *ExecutionManager) ListExecutions(
logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err)
return nil, err
}
executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions)
executionList, err := transformers.FromExecutionModels(ctx, output.Executions, transformers.ListExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx,
"Failed to transform execution models [%+v] with err: %v", output.Executions, err)
Expand Down Expand Up @@ -1489,7 +1495,7 @@ func (m *ExecutionManager) ListExecutions(
func (m *ExecutionManager) publishNotifications(ctx context.Context, request admin.WorkflowExecutionEventRequest,
execution models.Execution) error {
// Notifications are stored in the Spec object of an admin.Execution object.
adminExecution, err := transformers.FromExecutionModel(execution, transformers.DefaultExecutionTransformerOptions)
adminExecution, err := transformers.FromExecutionModel(ctx, execution, transformers.DefaultExecutionTransformerOptions)
if err != nil {
// This shouldn't happen because execution manager marshaled the data into models.Execution.
m.systemMetrics.TransformerError.Inc()
Expand Down
117 changes: 96 additions & 21 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,37 @@ func getLegacySpecBytes() []byte {
return b
}

func getExpectedLegacySpec() *admin.ExecutionSpec {
expectedLegacySpec := getLegacySpec()
expectedLegacySpec.Metadata = &admin.ExecutionMetadata{
SystemMetadata: &admin.SystemMetadata{
Namespace: "project-domain",
},
}
return expectedLegacySpec
}

func getExpectedLegacySpecBytes() []byte {
expectedLegacySpec := getExpectedLegacySpec()
b, _ := proto.Marshal(expectedLegacySpec)
return b
}

func getExpectedSpec() *admin.ExecutionSpec {
expectedSpec := testutils.GetExecutionRequest().Spec
expectedSpec.Metadata = &admin.ExecutionMetadata{
SystemMetadata: &admin.SystemMetadata{
Namespace: "project-domain",
},
}
return expectedSpec
}

func getExpectedSpecBytes() []byte {
specBytes, _ := proto.Marshal(getExpectedSpec())
return specBytes
}

func getLegacyClosure() *admin.ExecutionClosure {
return &admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
Expand Down Expand Up @@ -287,6 +318,7 @@ func TestCreateExecution(t *testing.T) {
assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix)
assert.True(t, proto.Equal(spec.ClusterAssignment, &clusterAssignment))
assert.Equal(t, "launch_plan", input.LaunchEntity)
assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project-domain")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add one test with namespace mapping which equals just project

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is already testing that we're exercising namespace mapping config but added another test

return nil
})
setDefaultLpCallbackForExecTest(repository)
Expand Down Expand Up @@ -1167,6 +1199,49 @@ func TestCreateExecutionWithEnvs(t *testing.T) {
}
}

func TestCreateExecution_CustomNamespaceMappingConfig(t *testing.T) {
request := testutils.GetExecutionRequest()
repository := getMockRepositoryForExecTest()
storageClient := getMockStorageForExecTest(context.Background())
setDefaultLpCallbackForExecTest(repository)
mockClock := clock.NewMock()
createdAt := time.Now()
mockClock.Set(createdAt)
exCreateFunc := func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project")
return nil
}

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil)
mockExecutor.OnID().Return("testMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)

mockNs := runtimeMocks.NamespaceMappingConfiguration{}
mockNs.OnGetNamespaceTemplate().Return("{{ project }}")
mockExecutionsConfigProvider := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(),
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, &mockNs)
mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())

execManager := NewExecutionManager(repository, r, mockExecutionsConfigProvider, storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

execManager.(*ExecutionManager)._clock = mockClock

response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
assert.True(t, proto.Equal(&executionIdentifier, response.Id))
}

func makeExecutionGetFunc(
t *testing.T, closureBytes []byte, startTime *time.Time) repositoryMocks.GetExecutionFunc {
return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
Expand All @@ -1182,7 +1257,7 @@ func makeExecutionGetFunc(
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_QUEUED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -1769,7 +1844,7 @@ func TestRecoverExecution_RecoveredChildNode(t *testing.T) {
switch input.Name {
case "name":
return models.Execution{
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: existingClosureBytes,
BaseModel: models.BaseModel{
ID: referencedExecutionID,
Expand Down Expand Up @@ -2148,7 +2223,7 @@ func TestCreateWorkflowEvent(t *testing.T) {
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
assert.Equal(t, closureBytes, execution.Closure)
assert.Equal(t, specBytes, execution.Spec)
assert.Equal(t, getExpectedSpecBytes(), execution.Spec)
assert.Equal(t, startTime, *execution.StartedAt)
assert.Equal(t, duration, execution.Duration)
return nil
Expand Down Expand Up @@ -2188,7 +2263,7 @@ func TestCreateWorkflowEvent_TerminalState(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_FAILED.String(),
}, nil
}
Expand Down Expand Up @@ -2228,7 +2303,7 @@ func TestCreateWorkflowEvent_NoRunningToQueued(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
}, nil
}
Expand Down Expand Up @@ -2264,7 +2339,7 @@ func TestCreateWorkflowEvent_CurrentlyAborting(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_ABORTING.String(),
}, nil
}
Expand Down Expand Up @@ -2335,7 +2410,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) {
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_RUNNING.String(), execution.Phase)
assert.Equal(t, closureBytes, execution.Closure)
assert.Equal(t, specBytes, execution.Spec)
assert.Equal(t, getExpectedSpecBytes(), execution.Spec)
assert.Equal(t, occurredAt, *execution.StartedAt)
assert.Equal(t, time.Duration(0), execution.Duration)
assert.Equal(t, occurredAt, *execution.ExecutionUpdatedAt)
Expand Down Expand Up @@ -2377,7 +2452,7 @@ func TestCreateWorkflowEvent_DuplicateRunning(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2420,7 +2495,7 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_SUCCEEDED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2467,7 +2542,7 @@ func TestCreateWorkflowEvent_ClusterReassignmentOnQueued(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_UNDEFINED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2655,7 +2730,7 @@ func TestCreateWorkflowEvent_IncompatibleCluster(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2714,7 +2789,7 @@ func TestGetExecution(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand All @@ -2732,7 +2807,7 @@ func TestGetExecution(t *testing.T) {
})
assert.NoError(t, err)
assert.True(t, proto.Equal(&executionIdentifier, execution.Id))
assert.True(t, proto.Equal(spec, execution.Spec))
assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec))
assert.True(t, proto.Equal(&closure, execution.Closure))
}

Expand Down Expand Up @@ -2920,7 +2995,7 @@ func TestListExecutions(t *testing.T) {
Domain: domainValue,
Name: "my awesome execution",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: closureBytes,
},
{
Expand All @@ -2933,7 +3008,7 @@ func TestListExecutions(t *testing.T) {
Name: "my other execution",
},
Phase: core.WorkflowExecution_SUCCEEDED.String(),
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: closureBytes,
},
},
Expand Down Expand Up @@ -2966,7 +3041,7 @@ func TestListExecutions(t *testing.T) {
if idx == 0 {
assert.Equal(t, "my awesome execution", execution.Id.Name)
}
assert.True(t, proto.Equal(spec, execution.Spec))
assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec))
assert.True(t, proto.Equal(&closure, execution.Closure))
}
assert.Empty(t, executionList.Token)
Expand Down Expand Up @@ -3150,7 +3225,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) {
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
Spec: getExpectedSpecBytes(),
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))
}
Expand Down Expand Up @@ -3259,7 +3334,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
Spec: getExpectedSpecBytes(),
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))

Expand Down Expand Up @@ -3486,7 +3561,7 @@ func TestGetExecutionData(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -3661,7 +3736,7 @@ func TestGetExecution_Legacy(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: getLegacySpecBytes(),
Spec: getExpectedLegacySpecBytes(),
Phase: phase,
Closure: getLegacyClosureBytes(),
LaunchPlanID: uint(1),
Expand All @@ -3678,7 +3753,7 @@ func TestGetExecution_Legacy(t *testing.T) {
})
assert.NoError(t, err)
assert.True(t, proto.Equal(&executionIdentifier, execution.Id))
assert.True(t, proto.Equal(getLegacySpec(), execution.Spec))
assert.True(t, proto.Equal(getExpectedLegacySpec(), execution.Spec))
assert.True(t, proto.Equal(getLegacyClosure(), execution.Closure))
}

Expand Down
Loading