From 105acf2a129a9e1760ac333b14f5a65135d53180 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 22 May 2023 11:41:36 -0700 Subject: [PATCH] review comments Signed-off-by: Katrina Rogan --- pkg/manager/impl/execution_manager.go | 6 +- pkg/manager/impl/execution_manager_test.go | 116 ++++++++++++++---- pkg/repositories/transformers/execution.go | 14 +++ .../transformers/execution_test.go | 23 ++++ 4 files changed, 137 insertions(+), 22 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index eec7df724..752da55dd 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -1318,7 +1318,11 @@ func (m *ExecutionManager) GetExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err) return nil, err } - execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions) + namespace := common.GetNamespaceName( + m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.GetId().GetProject(), request.GetId().GetDomain()) + execution, transformerErr := transformers.FromExecutionModel(*executionModel, &transformers.ExecutionTransformerOptions{ + DefaultNamespace: namespace, + }) if transformerErr != nil { logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, transformerErr) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 35a3f1e4f..73fabbc40 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -107,6 +107,37 @@ func getLegacySpecBytes() []byte { return b } +func getExpectedLegacySpec() *admin.ExecutionSpec { + expectedLegacySpec := getLegacySpec() + expectedLegacySpec.Metadata = &admin.ExecutionMetadata{ + SystemMetadata: &admin.SystemMetadata{ + Namespace: "project-domain", + }, + } + return expectedLegacySpec +} + +func getExpectedLegacySpecBytes() []byte { + expectedLegacySpec := getExpectedLegacySpec() + b, _ := proto.Marshal(expectedLegacySpec) + return b +} + +func getExpectedSpec() *admin.ExecutionSpec { + expectedSpec := testutils.GetExecutionRequest().Spec + expectedSpec.Metadata = &admin.ExecutionMetadata{ + SystemMetadata: &admin.SystemMetadata{ + Namespace: "project-domain", + }, + } + return expectedSpec +} + +func getExpectedSpecBytes() []byte { + specBytes, _ := proto.Marshal(getExpectedSpec()) + return specBytes +} + func getLegacyClosure() *admin.ExecutionClosure { return &admin.ExecutionClosure{ Phase: core.WorkflowExecution_RUNNING, @@ -1168,6 +1199,49 @@ func TestCreateExecutionWithEnvs(t *testing.T) { } } +func TestCreateExecution_CustomNamespaceMappingConfig(t *testing.T) { + request := testutils.GetExecutionRequest() + repository := getMockRepositoryForExecTest() + storageClient := getMockStorageForExecTest(context.Background()) + setDefaultLpCallbackForExecTest(repository) + mockClock := clock.NewMock() + createdAt := time.Now() + mockClock.Set(createdAt) + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.NoError(t, err) + assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project") + return nil + } + + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + + mockNs := runtimeMocks.NamespaceMappingConfiguration{} + mockNs.OnGetNamespaceTemplate().Return("{{ project }}") + mockExecutionsConfigProvider := runtimeMocks.NewMockConfigurationProvider( + testutils.GetApplicationConfigWithDefaultDomains(), + runtimeMocks.NewMockQueueConfigurationProvider( + []runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}), + nil, + runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, &mockNs) + mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( + runtimeMocks.NewMockRegistrationValidationProvider()) + + execManager := NewExecutionManager(repository, r, mockExecutionsConfigProvider, storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + execManager.(*ExecutionManager)._clock = mockClock + + response, err := execManager.CreateExecution(context.Background(), request, requestedAt) + assert.Nil(t, err) + assert.True(t, proto.Equal(&executionIdentifier, response.Id)) +} + func makeExecutionGetFunc( t *testing.T, closureBytes []byte, startTime *time.Time) repositoryMocks.GetExecutionFunc { return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { @@ -1183,7 +1257,7 @@ func makeExecutionGetFunc( BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_QUEUED.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -1770,7 +1844,7 @@ func TestRecoverExecution_RecoveredChildNode(t *testing.T) { switch input.Name { case "name": return models.Execution{ - Spec: specBytes, + Spec: getExpectedSpecBytes(), Closure: existingClosureBytes, BaseModel: models.BaseModel{ ID: referencedExecutionID, @@ -2149,7 +2223,7 @@ func TestCreateWorkflowEvent(t *testing.T) { assert.Equal(t, uint(2), execution.WorkflowID) assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase) assert.Equal(t, closureBytes, execution.Closure) - assert.Equal(t, specBytes, execution.Spec) + assert.Equal(t, getExpectedSpecBytes(), execution.Spec) assert.Equal(t, startTime, *execution.StartedAt) assert.Equal(t, duration, execution.Duration) return nil @@ -2189,7 +2263,7 @@ func TestCreateWorkflowEvent_TerminalState(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_FAILED.String(), }, nil } @@ -2229,7 +2303,7 @@ func TestCreateWorkflowEvent_NoRunningToQueued(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_RUNNING.String(), }, nil } @@ -2265,7 +2339,7 @@ func TestCreateWorkflowEvent_CurrentlyAborting(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_ABORTING.String(), }, nil } @@ -2336,7 +2410,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) { assert.Equal(t, uint(2), execution.WorkflowID) assert.Equal(t, core.WorkflowExecution_RUNNING.String(), execution.Phase) assert.Equal(t, closureBytes, execution.Closure) - assert.Equal(t, specBytes, execution.Spec) + assert.Equal(t, getExpectedSpecBytes(), execution.Spec) assert.Equal(t, occurredAt, *execution.StartedAt) assert.Equal(t, time.Duration(0), execution.Duration) assert.Equal(t, occurredAt, *execution.ExecutionUpdatedAt) @@ -2378,7 +2452,7 @@ func TestCreateWorkflowEvent_DuplicateRunning(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_RUNNING.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2421,7 +2495,7 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_SUCCEEDED.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2468,7 +2542,7 @@ func TestCreateWorkflowEvent_ClusterReassignmentOnQueued(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_UNDEFINED.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2656,7 +2730,7 @@ func TestCreateWorkflowEvent_IncompatibleCluster(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_RUNNING.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2715,7 +2789,7 @@ func TestGetExecution(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: phase, Closure: closureBytes, LaunchPlanID: uint(1), @@ -2733,7 +2807,7 @@ func TestGetExecution(t *testing.T) { }) assert.NoError(t, err) assert.True(t, proto.Equal(&executionIdentifier, execution.Id)) - assert.True(t, proto.Equal(spec, execution.Spec)) + assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec)) assert.True(t, proto.Equal(&closure, execution.Closure)) } @@ -2921,7 +2995,7 @@ func TestListExecutions(t *testing.T) { Domain: domainValue, Name: "my awesome execution", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Closure: closureBytes, }, { @@ -2934,7 +3008,7 @@ func TestListExecutions(t *testing.T) { Name: "my other execution", }, Phase: core.WorkflowExecution_SUCCEEDED.String(), - Spec: specBytes, + Spec: getExpectedSpecBytes(), Closure: closureBytes, }, }, @@ -2967,7 +3041,7 @@ func TestListExecutions(t *testing.T) { if idx == 0 { assert.Equal(t, "my awesome execution", execution.Id.Name) } - assert.True(t, proto.Equal(spec, execution.Spec)) + assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec)) assert.True(t, proto.Equal(&closure, execution.Closure)) } assert.Empty(t, executionList.Token) @@ -3151,7 +3225,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) { LaunchPlanID: uint(1), WorkflowID: uint(2), Closure: execClosureBytes, - Spec: specBytes, + Spec: getExpectedSpecBytes(), } assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel)) } @@ -3260,7 +3334,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro LaunchPlanID: uint(1), WorkflowID: uint(2), Closure: execClosureBytes, - Spec: specBytes, + Spec: getExpectedSpecBytes(), } assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel)) @@ -3487,7 +3561,7 @@ func TestGetExecutionData(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: phase, Closure: closureBytes, LaunchPlanID: uint(1), @@ -3662,7 +3736,7 @@ func TestGetExecution_Legacy(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: getLegacySpecBytes(), + Spec: getExpectedLegacySpecBytes(), Phase: phase, Closure: getLegacyClosureBytes(), LaunchPlanID: uint(1), @@ -3679,7 +3753,7 @@ func TestGetExecution_Legacy(t *testing.T) { }) assert.NoError(t, err) assert.True(t, proto.Equal(&executionIdentifier, execution.Id)) - assert.True(t, proto.Equal(getLegacySpec(), execution.Spec)) + assert.True(t, proto.Equal(getExpectedLegacySpec(), execution.Spec)) assert.True(t, proto.Equal(getLegacyClosure(), execution.Closure)) } diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index 9a3fd2abf..c69bb0889 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -49,6 +49,7 @@ type CreateExecutionModelInput struct { type ExecutionTransformerOptions struct { TrimErrorMessage bool + DefaultNamespace string } var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{} @@ -324,6 +325,19 @@ func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransfor if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec") } + if len(opts.DefaultNamespace) > 0 { + if spec.Metadata == nil { + spec.Metadata = &admin.ExecutionMetadata{} + } + if spec.Metadata.SystemMetadata == nil { + spec.Metadata.SystemMetadata = &admin.SystemMetadata{} + } + if len(spec.GetMetadata().GetSystemMetadata().Namespace) == 0 { + logger.Infof(context.TODO(), "setting execution system metadata namespace to [%s]", opts.DefaultNamespace) + spec.Metadata.SystemMetadata.Namespace = opts.DefaultNamespace + } + } + var closure admin.ExecutionClosure if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure") diff --git a/pkg/repositories/transformers/execution_test.go b/pkg/repositories/transformers/execution_test.go index cd6e144e3..b7ff7faaa 100644 --- a/pkg/repositories/transformers/execution_test.go +++ b/pkg/repositories/transformers/execution_test.go @@ -602,6 +602,29 @@ func TestFromExecutionModel_Error(t *testing.T) { assert.True(t, proto.Equal(expectedExecErr, execution.Closure.GetError())) } +func TestFromExecutionModel_OverwriteNamespace(t *testing.T) { + abortCause := "abort cause" + executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{ + Phase: core.WorkflowExecution_RUNNING, + }) + executionModel := models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Phase: core.WorkflowExecution_RUNNING.String(), + AbortCause: abortCause, + Closure: executionClosureBytes, + } + overwrittenNamespace := "ns" + execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{ + DefaultNamespace: overwrittenNamespace, + }) + assert.NoError(t, err) + assert.Equal(t, execution.GetSpec().GetMetadata().GetSystemMetadata().Namespace, overwrittenNamespace) +} + func TestFromExecutionModels(t *testing.T) { spec := testutils.GetExecutionRequest().Spec specBytes, _ := proto.Marshal(spec)