Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan committed May 22, 2023
1 parent 3e42af2 commit 105acf2
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 22 deletions.
6 changes: 5 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,11 @@ func (m *ExecutionManager) GetExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.GetId().GetProject(), request.GetId().GetDomain())
execution, transformerErr := transformers.FromExecutionModel(*executionModel, &transformers.ExecutionTransformerOptions{
DefaultNamespace: namespace,
})
if transformerErr != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id,
transformerErr)
Expand Down
116 changes: 95 additions & 21 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,37 @@ func getLegacySpecBytes() []byte {
return b
}

func getExpectedLegacySpec() *admin.ExecutionSpec {
expectedLegacySpec := getLegacySpec()
expectedLegacySpec.Metadata = &admin.ExecutionMetadata{
SystemMetadata: &admin.SystemMetadata{
Namespace: "project-domain",
},
}
return expectedLegacySpec
}

func getExpectedLegacySpecBytes() []byte {
expectedLegacySpec := getExpectedLegacySpec()
b, _ := proto.Marshal(expectedLegacySpec)
return b
}

func getExpectedSpec() *admin.ExecutionSpec {
expectedSpec := testutils.GetExecutionRequest().Spec
expectedSpec.Metadata = &admin.ExecutionMetadata{
SystemMetadata: &admin.SystemMetadata{
Namespace: "project-domain",
},
}
return expectedSpec
}

func getExpectedSpecBytes() []byte {
specBytes, _ := proto.Marshal(getExpectedSpec())
return specBytes
}

func getLegacyClosure() *admin.ExecutionClosure {
return &admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
Expand Down Expand Up @@ -1168,6 +1199,49 @@ func TestCreateExecutionWithEnvs(t *testing.T) {
}
}

func TestCreateExecution_CustomNamespaceMappingConfig(t *testing.T) {
request := testutils.GetExecutionRequest()
repository := getMockRepositoryForExecTest()
storageClient := getMockStorageForExecTest(context.Background())
setDefaultLpCallbackForExecTest(repository)
mockClock := clock.NewMock()
createdAt := time.Now()
mockClock.Set(createdAt)
exCreateFunc := func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project")
return nil
}

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil)
mockExecutor.OnID().Return("testMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)

mockNs := runtimeMocks.NamespaceMappingConfiguration{}
mockNs.OnGetNamespaceTemplate().Return("{{ project }}")
mockExecutionsConfigProvider := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(),
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, &mockNs)
mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())

execManager := NewExecutionManager(repository, r, mockExecutionsConfigProvider, storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

execManager.(*ExecutionManager)._clock = mockClock

response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
assert.True(t, proto.Equal(&executionIdentifier, response.Id))
}

func makeExecutionGetFunc(
t *testing.T, closureBytes []byte, startTime *time.Time) repositoryMocks.GetExecutionFunc {
return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
Expand All @@ -1183,7 +1257,7 @@ func makeExecutionGetFunc(
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_QUEUED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -1770,7 +1844,7 @@ func TestRecoverExecution_RecoveredChildNode(t *testing.T) {
switch input.Name {
case "name":
return models.Execution{
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: existingClosureBytes,
BaseModel: models.BaseModel{
ID: referencedExecutionID,
Expand Down Expand Up @@ -2149,7 +2223,7 @@ func TestCreateWorkflowEvent(t *testing.T) {
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
assert.Equal(t, closureBytes, execution.Closure)
assert.Equal(t, specBytes, execution.Spec)
assert.Equal(t, getExpectedSpecBytes(), execution.Spec)
assert.Equal(t, startTime, *execution.StartedAt)
assert.Equal(t, duration, execution.Duration)
return nil
Expand Down Expand Up @@ -2189,7 +2263,7 @@ func TestCreateWorkflowEvent_TerminalState(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_FAILED.String(),
}, nil
}
Expand Down Expand Up @@ -2229,7 +2303,7 @@ func TestCreateWorkflowEvent_NoRunningToQueued(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
}, nil
}
Expand Down Expand Up @@ -2265,7 +2339,7 @@ func TestCreateWorkflowEvent_CurrentlyAborting(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_ABORTING.String(),
}, nil
}
Expand Down Expand Up @@ -2336,7 +2410,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) {
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_RUNNING.String(), execution.Phase)
assert.Equal(t, closureBytes, execution.Closure)
assert.Equal(t, specBytes, execution.Spec)
assert.Equal(t, getExpectedSpecBytes(), execution.Spec)
assert.Equal(t, occurredAt, *execution.StartedAt)
assert.Equal(t, time.Duration(0), execution.Duration)
assert.Equal(t, occurredAt, *execution.ExecutionUpdatedAt)
Expand Down Expand Up @@ -2378,7 +2452,7 @@ func TestCreateWorkflowEvent_DuplicateRunning(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2421,7 +2495,7 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_SUCCEEDED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2468,7 +2542,7 @@ func TestCreateWorkflowEvent_ClusterReassignmentOnQueued(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_UNDEFINED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2656,7 +2730,7 @@ func TestCreateWorkflowEvent_IncompatibleCluster(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2715,7 +2789,7 @@ func TestGetExecution(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand All @@ -2733,7 +2807,7 @@ func TestGetExecution(t *testing.T) {
})
assert.NoError(t, err)
assert.True(t, proto.Equal(&executionIdentifier, execution.Id))
assert.True(t, proto.Equal(spec, execution.Spec))
assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec))
assert.True(t, proto.Equal(&closure, execution.Closure))
}

Expand Down Expand Up @@ -2921,7 +2995,7 @@ func TestListExecutions(t *testing.T) {
Domain: domainValue,
Name: "my awesome execution",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: closureBytes,
},
{
Expand All @@ -2934,7 +3008,7 @@ func TestListExecutions(t *testing.T) {
Name: "my other execution",
},
Phase: core.WorkflowExecution_SUCCEEDED.String(),
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: closureBytes,
},
},
Expand Down Expand Up @@ -2967,7 +3041,7 @@ func TestListExecutions(t *testing.T) {
if idx == 0 {
assert.Equal(t, "my awesome execution", execution.Id.Name)
}
assert.True(t, proto.Equal(spec, execution.Spec))
assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec))
assert.True(t, proto.Equal(&closure, execution.Closure))
}
assert.Empty(t, executionList.Token)
Expand Down Expand Up @@ -3151,7 +3225,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) {
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
Spec: getExpectedSpecBytes(),
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))
}
Expand Down Expand Up @@ -3260,7 +3334,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
Spec: getExpectedSpecBytes(),
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))

Expand Down Expand Up @@ -3487,7 +3561,7 @@ func TestGetExecutionData(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -3662,7 +3736,7 @@ func TestGetExecution_Legacy(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: getLegacySpecBytes(),
Spec: getExpectedLegacySpecBytes(),
Phase: phase,
Closure: getLegacyClosureBytes(),
LaunchPlanID: uint(1),
Expand All @@ -3679,7 +3753,7 @@ func TestGetExecution_Legacy(t *testing.T) {
})
assert.NoError(t, err)
assert.True(t, proto.Equal(&executionIdentifier, execution.Id))
assert.True(t, proto.Equal(getLegacySpec(), execution.Spec))
assert.True(t, proto.Equal(getExpectedLegacySpec(), execution.Spec))
assert.True(t, proto.Equal(getLegacyClosure(), execution.Closure))
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type CreateExecutionModelInput struct {

type ExecutionTransformerOptions struct {
TrimErrorMessage bool
DefaultNamespace string
}

var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{}
Expand Down Expand Up @@ -324,6 +325,19 @@ func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransfor
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
}
if len(opts.DefaultNamespace) > 0 {
if spec.Metadata == nil {
spec.Metadata = &admin.ExecutionMetadata{}
}
if spec.Metadata.SystemMetadata == nil {
spec.Metadata.SystemMetadata = &admin.SystemMetadata{}
}
if len(spec.GetMetadata().GetSystemMetadata().Namespace) == 0 {
logger.Infof(context.TODO(), "setting execution system metadata namespace to [%s]", opts.DefaultNamespace)
spec.Metadata.SystemMetadata.Namespace = opts.DefaultNamespace
}
}

var closure admin.ExecutionClosure
if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
Expand Down
23 changes: 23 additions & 0 deletions pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,29 @@ func TestFromExecutionModel_Error(t *testing.T) {
assert.True(t, proto.Equal(expectedExecErr, execution.Closure.GetError()))
}

func TestFromExecutionModel_OverwriteNamespace(t *testing.T) {
abortCause := "abort cause"
executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
})
executionModel := models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
Phase: core.WorkflowExecution_RUNNING.String(),
AbortCause: abortCause,
Closure: executionClosureBytes,
}
overwrittenNamespace := "ns"
execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{
DefaultNamespace: overwrittenNamespace,
})
assert.NoError(t, err)
assert.Equal(t, execution.GetSpec().GetMetadata().GetSystemMetadata().Namespace, overwrittenNamespace)
}

func TestFromExecutionModels(t *testing.T) {
spec := testutils.GetExecutionRequest().Spec
specBytes, _ := proto.Marshal(spec)
Expand Down

0 comments on commit 105acf2

Please sign in to comment.