diff --git a/Gopkg.lock b/Gopkg.lock index b7df52dfe4..0d8f59797b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -420,7 +420,62 @@ version = "v1.2.0" [[projects]] - digest = "1:6fdab8881f20d41b91983d5aa104401be6b7d28e9cfd0a1a93337ec094d877ba" + digest = "1:32d9795a0ac21fa2ed7d92ff408bb6cc00b33ac0a29ab06bef81d3f285672b47" + name = "github.com/lyft/flyteadmin" + packages = [ + "cmd/entrypoints", + "pkg/async/notifications", + "pkg/async/notifications/implementations", + "pkg/async/notifications/interfaces", + "pkg/async/notifications/mocks", + "pkg/async/schedule", + "pkg/async/schedule/aws", + "pkg/async/schedule/aws/interfaces", + "pkg/async/schedule/aws/mocks", + "pkg/async/schedule/interfaces", + "pkg/async/schedule/mocks", + "pkg/async/schedule/noop", + "pkg/clusterresource", + "pkg/common", + "pkg/common/mocks", + "pkg/config", + "pkg/data", + "pkg/data/implementations", + "pkg/data/interfaces", + "pkg/data/mocks", + "pkg/errors", + "pkg/flytek8s", + "pkg/manager/impl", + "pkg/manager/impl/executions", + "pkg/manager/impl/shared", + "pkg/manager/impl/testutils", + "pkg/manager/impl/util", + "pkg/manager/impl/validation", + "pkg/manager/interfaces", + "pkg/manager/mocks", + "pkg/repositories", + "pkg/repositories/config", + "pkg/repositories/errors", + "pkg/repositories/gormimpl", + "pkg/repositories/interfaces", + "pkg/repositories/mocks", + "pkg/repositories/models", + "pkg/repositories/transformers", + "pkg/rpc/adminservice", + "pkg/rpc/adminservice/util", + "pkg/runtime", + "pkg/runtime/interfaces", + "pkg/runtime/mocks", + "pkg/workflowengine/impl", + "pkg/workflowengine/interfaces", + "pkg/workflowengine/mocks", + ] + pruneopts = "UT" + revision = "d1c61c34f62d8ee51964f47877802d070dfa9e98" + version = "v0.1.0" + +[[projects]] + digest = "1:4c02e347457c97ee8cfafb413554854fe236d715879ac0a43743017cd179de2e" name = "github.com/lyft/flyteidl" packages = [ "clients/go/admin", @@ -1129,6 +1184,52 @@ "github.com/jinzhu/gorm", "github.com/jinzhu/gorm/dialects/postgres", "github.com/lib/pq", + "github.com/lyft/flyteadmin/cmd/entrypoints", + "github.com/lyft/flyteadmin/pkg/async/notifications", + "github.com/lyft/flyteadmin/pkg/async/notifications/implementations", + "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces", + "github.com/lyft/flyteadmin/pkg/async/notifications/mocks", + "github.com/lyft/flyteadmin/pkg/async/schedule", + "github.com/lyft/flyteadmin/pkg/async/schedule/aws", + "github.com/lyft/flyteadmin/pkg/async/schedule/aws/interfaces", + "github.com/lyft/flyteadmin/pkg/async/schedule/aws/mocks", + "github.com/lyft/flyteadmin/pkg/async/schedule/interfaces", + "github.com/lyft/flyteadmin/pkg/async/schedule/mocks", + "github.com/lyft/flyteadmin/pkg/async/schedule/noop", + "github.com/lyft/flyteadmin/pkg/clusterresource", + "github.com/lyft/flyteadmin/pkg/common", + "github.com/lyft/flyteadmin/pkg/common/mocks", + "github.com/lyft/flyteadmin/pkg/config", + "github.com/lyft/flyteadmin/pkg/data", + "github.com/lyft/flyteadmin/pkg/data/implementations", + "github.com/lyft/flyteadmin/pkg/data/interfaces", + "github.com/lyft/flyteadmin/pkg/data/mocks", + "github.com/lyft/flyteadmin/pkg/errors", + "github.com/lyft/flyteadmin/pkg/flytek8s", + "github.com/lyft/flyteadmin/pkg/manager/impl", + "github.com/lyft/flyteadmin/pkg/manager/impl/executions", + "github.com/lyft/flyteadmin/pkg/manager/impl/shared", + "github.com/lyft/flyteadmin/pkg/manager/impl/testutils", + "github.com/lyft/flyteadmin/pkg/manager/impl/util", + "github.com/lyft/flyteadmin/pkg/manager/impl/validation", + "github.com/lyft/flyteadmin/pkg/manager/interfaces", + "github.com/lyft/flyteadmin/pkg/manager/mocks", + "github.com/lyft/flyteadmin/pkg/repositories", + "github.com/lyft/flyteadmin/pkg/repositories/config", + "github.com/lyft/flyteadmin/pkg/repositories/errors", + "github.com/lyft/flyteadmin/pkg/repositories/gormimpl", + "github.com/lyft/flyteadmin/pkg/repositories/interfaces", + "github.com/lyft/flyteadmin/pkg/repositories/mocks", + "github.com/lyft/flyteadmin/pkg/repositories/models", + "github.com/lyft/flyteadmin/pkg/repositories/transformers", + "github.com/lyft/flyteadmin/pkg/rpc/adminservice", + "github.com/lyft/flyteadmin/pkg/rpc/adminservice/util", + "github.com/lyft/flyteadmin/pkg/runtime", + "github.com/lyft/flyteadmin/pkg/runtime/interfaces", + "github.com/lyft/flyteadmin/pkg/runtime/mocks", + "github.com/lyft/flyteadmin/pkg/workflowengine/impl", + "github.com/lyft/flyteadmin/pkg/workflowengine/interfaces", + "github.com/lyft/flyteadmin/pkg/workflowengine/mocks", "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin", "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core", "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event", diff --git a/pkg/clusterresource/controller.go b/pkg/clusterresource/controller.go index d1ae876a89..11bdd8f0df 100644 --- a/pkg/clusterresource/controller.go +++ b/pkg/clusterresource/controller.go @@ -35,7 +35,6 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" ) -const namespaceFormat = "%s-%s" const namespaceVariable = "namespace" const templateVariableFormat = "{{ %s }}" const replaceAllInstancesOfString = -1 @@ -283,7 +282,7 @@ func (c *controller) Sync(ctx context.Context) error { var errs = make([]error, 0) for _, project := range projects { for _, domain := range *domains { - namespace := fmt.Sprintf(namespaceFormat, project.Identifier, domain.Name) + namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceMappingConfig(), project.Identifier, domain.Name) err := c.syncNamespace(ctx, namespace) if err != nil { logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err) diff --git a/pkg/common/namespace.go b/pkg/common/namespace.go new file mode 100644 index 0000000000..41e7601c9d --- /dev/null +++ b/pkg/common/namespace.go @@ -0,0 +1,24 @@ +package common + +import "fmt" + +type NamespaceMapping int + +const namespaceFormat = "%s-%s" + +const ( + ProjectDomain NamespaceMapping = iota + Domain NamespaceMapping = iota +) + +// GetNamespaceName returns kubernetes namespace name +func GetNamespaceName(mapping NamespaceMapping, project, domain string) string { + switch mapping { + case Domain: + return domain + case ProjectDomain: + fallthrough + default: + return fmt.Sprintf(namespaceFormat, project, domain) + } +} diff --git a/pkg/common/namespace_test.go b/pkg/common/namespace_test.go new file mode 100644 index 0000000000..4a4d819f2b --- /dev/null +++ b/pkg/common/namespace_test.go @@ -0,0 +1,25 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetNamespaceName(t *testing.T) { + testCases := []struct { + mapping NamespaceMapping + project string + domain string + want string + }{ + {ProjectDomain, "project", "production", "project-production"}, + {20 /*Dummy enum value that is not supported*/, "project", "development", "project-development"}, + {Domain, "project", "production", "production"}, + } + + for _, tc := range testCases { + got := GetNamespaceName(tc.mapping, tc.project, tc.domain) + assert.Equal(t, got, tc.want) + } +} diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 7e6ca5c2e2..cc23d36fae 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -98,7 +98,7 @@ func getMockExecutionsConfigProvider() runtimeInterfaces.Configuration { testutils.GetApplicationConfigWithDefaultProjects(), runtimeMocks.NewMockQueueConfigurationProvider( []runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}), - nil, nil, nil) + nil, nil, nil, nil) mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( runtimeMocks.NewMockRegistrationValidationProvider()) return mockExecutionsConfigProvider @@ -328,7 +328,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) { Tags: []string{"tag"}, }, }), - nil, nil, nil) + nil, nil, nil, nil) configProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( runtimeMocks.NewMockRegistrationValidationProvider()) mockExecutor := workflowengineMocks.NewMockExecutor() @@ -1548,7 +1548,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) { &mockApplicationConfig, runtimeMocks.NewMockQueueConfigurationProvider( []runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}), - nil, nil, nil) + nil, nil, nil, nil) var myExecManager = &ExecutionManager{ db: repository, @@ -1681,7 +1681,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro &mockApplicationConfig, runtimeMocks.NewMockQueueConfigurationProvider( []runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}), - nil, nil, nil) + nil, nil, nil, nil) var myExecManager = &ExecutionManager{ db: repository, diff --git a/pkg/manager/impl/executions/queues_test.go b/pkg/manager/impl/executions/queues_test.go index 2336622b16..bd9e72c375 100644 --- a/pkg/manager/impl/executions/queues_test.go +++ b/pkg/manager/impl/executions/queues_test.go @@ -67,7 +67,7 @@ func TestGetQueue(t *testing.T) { } queueAllocator := NewQueueAllocator(runtimeMocks.NewMockConfigurationProvider( nil, runtimeMocks.NewMockQueueConfigurationProvider(executionQueues, workflowConfigs), - nil, nil, nil)) + nil, nil, nil, nil)) queueConfig := singleQueueConfiguration{ PrimaryQueue: "queue primary", DynamicQueue: "queue dynamic", @@ -139,7 +139,7 @@ func TestGetQueueDefaults(t *testing.T) { } queueAllocator := NewQueueAllocator(runtimeMocks.NewMockConfigurationProvider( nil, runtimeMocks.NewMockQueueConfigurationProvider(executionQueues, workflowConfigs), nil, - nil, nil)) + nil, nil, nil)) assert.Equal(t, singleQueueConfiguration{ PrimaryQueue: "default primary", DynamicQueue: "default dynamic", diff --git a/pkg/manager/impl/launch_plan_manager_test.go b/pkg/manager/impl/launch_plan_manager_test.go index a18f227485..9fcab6f62a 100644 --- a/pkg/manager/impl/launch_plan_manager_test.go +++ b/pkg/manager/impl/launch_plan_manager_test.go @@ -58,7 +58,7 @@ func getMockRepositoryForLpTest() repositories.RepositoryInterface { func getMockConfigForLpTest() runtimeInterfaces.Configuration { mockConfig := runtimeMocks.NewMockConfigurationProvider( - testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil) + testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil, nil) return mockConfig } diff --git a/pkg/manager/impl/project_manager_test.go b/pkg/manager/impl/project_manager_test.go index c9d70b85d7..46bfab6d93 100644 --- a/pkg/manager/impl/project_manager_test.go +++ b/pkg/manager/impl/project_manager_test.go @@ -16,7 +16,7 @@ import ( ) var mockProjectConfigProvider = runtimeMocks.NewMockConfigurationProvider( - testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil) + testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil, nil) var testDomainsForProjManager = []string{"domain", "development", "staging", "production"} @@ -80,7 +80,7 @@ func TestProjectManager_CreateProject(t *testing.T) { } projectManager := NewProjectManager(mockRepository, runtimeMocks.NewMockConfigurationProvider( - getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil)) + getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil, nil)) _, err := projectManager.CreateProject(context.Background(), admin.ProjectRegisterRequest{ Project: &admin.Project{ Id: "flyte-project-id", @@ -100,7 +100,7 @@ func TestProjectManager_CreateProjectError(t *testing.T) { } projectManager := NewProjectManager(mockRepository, runtimeMocks.NewMockConfigurationProvider( - getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil)) + getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil, nil)) _, err := projectManager.CreateProject(context.Background(), admin.ProjectRegisterRequest{ Project: &admin.Project{ Id: "flyte-project-id", diff --git a/pkg/manager/impl/task_manager_test.go b/pkg/manager/impl/task_manager_test.go index 57bafdd9f5..938b80d505 100644 --- a/pkg/manager/impl/task_manager_test.go +++ b/pkg/manager/impl/task_manager_test.go @@ -37,7 +37,7 @@ const limit = 100 func getMockConfigForTaskTest() runtimeInterfaces.Configuration { mockConfig := runtimeMocks.NewMockConfigurationProvider( testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, runtimeMocks.NewMockTaskResourceConfiguration( - runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}), runtimeMocks.NewMockWhitelistConfiguration()) + runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}), runtimeMocks.NewMockWhitelistConfiguration(), nil) return mockConfig } diff --git a/pkg/manager/impl/workflow_manager_test.go b/pkg/manager/impl/workflow_manager_test.go index c1bcce7d7a..59e1e6496b 100644 --- a/pkg/manager/impl/workflow_manager_test.go +++ b/pkg/manager/impl/workflow_manager_test.go @@ -69,7 +69,7 @@ var workflowClosureBytes, _ = proto.Marshal(&workflowClosure) func getMockWorkflowConfigProvider() runtimeInterfaces.Configuration { mockWorkflowConfigProvider := runtimeMocks.NewMockConfigurationProvider( - testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil) + testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil, nil) mockWorkflowConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( runtimeMocks.NewMockRegistrationValidationProvider()) return mockWorkflowConfigProvider diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index dc36a13d28..25f46aaeca 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -85,7 +85,8 @@ func NewAdminServer(kubeConfig, master string) *AdminService { workflowExecutor := workflowengine.NewFlytePropeller( applicationConfiguration.RoleNameKey, executionCluster, - adminScope.NewSubScope("executor").NewSubScope("flytepropeller")) + adminScope.NewSubScope("executor").NewSubScope("flytepropeller"), + configuration.NamespaceMappingConfiguration()) logger.Info(context.Background(), "Successfully created a workflow executor engine") dataStorageClient, err := storage.NewDataStore(storeConfig, adminScope.NewSubScope("storage")) if err != nil { diff --git a/pkg/runtime/cluster_resource_provider.go b/pkg/runtime/cluster_resource_provider.go index 94d645530c..d6b2ac40f3 100644 --- a/pkg/runtime/cluster_resource_provider.go +++ b/pkg/runtime/cluster_resource_provider.go @@ -43,6 +43,6 @@ func (p *ClusterResourceConfigurationProvider) GetRefreshInterval() time.Duratio return time.Minute } -func NewNamespaceConfigurationProvider() interfaces.ClusterResourceConfiguration { +func NewClusterResourceConfigurationProvider() interfaces.ClusterResourceConfiguration { return &ClusterResourceConfigurationProvider{} } diff --git a/pkg/runtime/configuration_provider.go b/pkg/runtime/configuration_provider.go index 79b0af8a61..ba1c237e63 100644 --- a/pkg/runtime/configuration_provider.go +++ b/pkg/runtime/configuration_provider.go @@ -13,6 +13,7 @@ type ConfigurationProvider struct { whitelistConfiguration interfaces.WhitelistConfiguration registrationValidationConfiguration interfaces.RegistrationValidationConfiguration clusterResourceConfiguration interfaces.ClusterResourceConfiguration + namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration } func (p *ConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration { @@ -43,6 +44,10 @@ func (p *ConfigurationProvider) ClusterResourceConfiguration() interfaces.Cluste return p.clusterResourceConfiguration } +func (p *ConfigurationProvider) NamespaceMappingConfiguration() interfaces.NamespaceMappingConfiguration { + return p.namespaceMappingConfiguration +} + func NewConfigurationProvider() interfaces.Configuration { return &ConfigurationProvider{ applicationConfiguration: NewApplicationConfigurationProvider(), @@ -51,6 +56,7 @@ func NewConfigurationProvider() interfaces.Configuration { taskResourceConfiguration: NewTaskResourceProvider(), whitelistConfiguration: NewWhitelistConfigurationProvider(), registrationValidationConfiguration: NewRegistrationValidationProvider(), - clusterResourceConfiguration: NewNamespaceConfigurationProvider(), + clusterResourceConfiguration: NewClusterResourceConfigurationProvider(), + namespaceMappingConfiguration: NewNamespaceMappingConfigurationProvider(), } } diff --git a/pkg/runtime/interfaces/configuration.go b/pkg/runtime/interfaces/configuration.go index 23298867b6..4c4151584f 100644 --- a/pkg/runtime/interfaces/configuration.go +++ b/pkg/runtime/interfaces/configuration.go @@ -9,4 +9,5 @@ type Configuration interface { WhitelistConfiguration() WhitelistConfiguration RegistrationValidationConfiguration() RegistrationValidationConfiguration ClusterResourceConfiguration() ClusterResourceConfiguration + NamespaceMappingConfiguration() NamespaceMappingConfiguration } diff --git a/pkg/runtime/interfaces/namespace_configuration.go b/pkg/runtime/interfaces/namespace_configuration.go new file mode 100644 index 0000000000..e3345de52b --- /dev/null +++ b/pkg/runtime/interfaces/namespace_configuration.go @@ -0,0 +1,11 @@ +package interfaces + +import "github.com/lyft/flyteadmin/pkg/common" + +type NamespaceMappingConfig struct { + Mapping string `json:"mapping"` +} + +type NamespaceMappingConfiguration interface { + GetNamespaceMappingConfig() common.NamespaceMapping +} diff --git a/pkg/runtime/mocks/mock_configuration_provider.go b/pkg/runtime/mocks/mock_configuration_provider.go index 99c36df07d..85129c981b 100644 --- a/pkg/runtime/mocks/mock_configuration_provider.go +++ b/pkg/runtime/mocks/mock_configuration_provider.go @@ -10,6 +10,7 @@ type MockConfigurationProvider struct { whitelistConfiguration interfaces.WhitelistConfiguration registrationValidationConfiguration interfaces.RegistrationValidationConfiguration clusterResourceConfiguration interfaces.ClusterResourceConfiguration + namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration } func (p *MockConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration { @@ -48,17 +49,27 @@ func (p *MockConfigurationProvider) AddClusterResourceConfiguration(config inter p.clusterResourceConfiguration = config } +func (p *MockConfigurationProvider) NamespaceMappingConfiguration() interfaces.NamespaceMappingConfiguration { + return p.namespaceMappingConfiguration +} + +func (p *MockConfigurationProvider) AddNamespaceMappingConfiguration(config interfaces.NamespaceMappingConfiguration) { + p.namespaceMappingConfiguration = config +} + func NewMockConfigurationProvider( applicationConfiguration interfaces.ApplicationConfiguration, queueConfiguration interfaces.QueueConfiguration, clusterConfiguration interfaces.ClusterConfiguration, taskResourceConfiguration interfaces.TaskResourceConfiguration, - whitelistConfiguration interfaces.WhitelistConfiguration) interfaces.Configuration { + whitelistConfiguration interfaces.WhitelistConfiguration, + namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration) interfaces.Configuration { return &MockConfigurationProvider{ - applicationConfiguration: applicationConfiguration, - queueConfiguration: queueConfiguration, - clusterConfiguration: clusterConfiguration, - taskResourceConfiguration: taskResourceConfiguration, - whitelistConfiguration: whitelistConfiguration, + applicationConfiguration: applicationConfiguration, + queueConfiguration: queueConfiguration, + clusterConfiguration: clusterConfiguration, + taskResourceConfiguration: taskResourceConfiguration, + whitelistConfiguration: whitelistConfiguration, + namespaceMappingConfiguration: namespaceMappingConfiguration, } } diff --git a/pkg/runtime/namespace_config_provider.go b/pkg/runtime/namespace_config_provider.go new file mode 100644 index 0000000000..ee5fdd2db5 --- /dev/null +++ b/pkg/runtime/namespace_config_provider.go @@ -0,0 +1,41 @@ +package runtime + +import ( + "context" + + "github.com/lyft/flyteadmin/pkg/common" + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flytestdlib/config" + "github.com/lyft/flytestdlib/logger" +) + +const ( + namespaceMappingKey = "namespace_mapping" + domainVariable = "domain" + projectDomainVariable = "project-domain" +) + +var namespaceMappingConfig = config.MustRegisterSection(namespaceMappingKey, &interfaces.NamespaceMappingConfig{}) + +type NamespaceMappingConfigurationProvider struct{} + +func (p *NamespaceMappingConfigurationProvider) GetNamespaceMappingConfig() common.NamespaceMapping { + var mapping string + if namespaceMappingConfig != nil && namespaceMappingConfig.GetConfig() != nil { + mapping = namespaceMappingConfig.GetConfig().(*interfaces.NamespaceMappingConfig).Mapping + } + + switch mapping { + case domainVariable: + return common.Domain + case projectDomainVariable: + return common.ProjectDomain + default: + logger.Warningf(context.Background(), "Unsupported value for namespace_mapping in config, defaulting to -") + return common.ProjectDomain + } +} + +func NewNamespaceMappingConfigurationProvider() interfaces.NamespaceMappingConfiguration { + return &NamespaceMappingConfigurationProvider{} +} diff --git a/pkg/workflowengine/impl/propeller_executor.go b/pkg/workflowengine/impl/propeller_executor.go index 83c7364253..fd8c76c86f 100644 --- a/pkg/workflowengine/impl/propeller_executor.go +++ b/pkg/workflowengine/impl/propeller_executor.go @@ -2,11 +2,12 @@ package impl import ( "context" - "fmt" interfaces2 "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" + "github.com/lyft/flyteadmin/pkg/common" "github.com/lyft/flyteadmin/pkg/executioncluster" + runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces" "github.com/lyft/flyteadmin/pkg/workflowengine/interfaces" "github.com/lyft/flytestdlib/promutils" @@ -25,8 +26,6 @@ import ( k8_api_err "k8s.io/apimachinery/pkg/api/errors" ) -const namespaceFormat = "%s-%s" - var deletePropagationBackground = v1.DeletePropagationBackground type propellerMetrics struct { @@ -44,6 +43,7 @@ type FlytePropeller struct { builder interfaces.FlyteWorkflowInterface roleNameKey string metrics propellerMetrics + config runtimeInterfaces.NamespaceMappingConfiguration } type FlyteWorkflowBuilder struct{} @@ -91,7 +91,7 @@ func (c *FlytePropeller) ExecuteWorkflow(ctx context.Context, input interfaces.E c.metrics.InvalidExecutionID.Inc() return nil, errors.NewFlyteAdminErrorf(codes.Internal, "invalid execution id") } - namespace := fmt.Sprintf(namespaceFormat, input.ExecutionID.GetProject(), input.ExecutionID.GetDomain()) + namespace := common.GetNamespaceName(c.config.GetNamespaceMappingConfig(), input.ExecutionID.GetProject(), input.ExecutionID.GetDomain()) flyteWf, err := c.builder.BuildFlyteWorkflow(&input.WfClosure, input.Inputs, input.ExecutionID, namespace) if err != nil { c.metrics.WorkflowBuildFailure.Inc() @@ -147,7 +147,7 @@ func (c *FlytePropeller) TerminateWorkflowExecution( c.metrics.InvalidExecutionID.Inc() return errors.NewFlyteAdminErrorf(codes.Internal, "invalid execution id") } - namespace := fmt.Sprintf(namespaceFormat, input.ExecutionID.GetProject(), input.ExecutionID.GetDomain()) + namespace := common.GetNamespaceName(c.config.GetNamespaceMappingConfig(), input.ExecutionID.GetProject(), input.ExecutionID.GetDomain()) target, err := c.executionCluster.GetTarget(&executioncluster.ExecutionTargetSpec{ TargetID: input.Cluster, }) @@ -186,12 +186,13 @@ func newPropellerMetrics(scope promutils.Scope) propellerMetrics { } func NewFlytePropeller(roleNameKey string, executionCluster interfaces2.ClusterInterface, - scope promutils.Scope) interfaces.Executor { + scope promutils.Scope, configuration runtimeInterfaces.NamespaceMappingConfiguration) interfaces.Executor { return &FlytePropeller{ executionCluster: executionCluster, builder: &FlyteWorkflowBuilder{}, roleNameKey: roleNameKey, metrics: newPropellerMetrics(scope), + config: configuration, } } diff --git a/pkg/workflowengine/impl/propeller_executor_test.go b/pkg/workflowengine/impl/propeller_executor_test.go index cc2c065e7b..69e666782a 100644 --- a/pkg/workflowengine/impl/propeller_executor_test.go +++ b/pkg/workflowengine/impl/propeller_executor_test.go @@ -10,6 +10,7 @@ import ( "github.com/lyft/flyteadmin/pkg/executioncluster" cluster_mock "github.com/lyft/flyteadmin/pkg/executioncluster/mocks" + "github.com/lyft/flyteadmin/pkg/runtime" "github.com/lyft/flyteadmin/pkg/workflowengine/interfaces" @@ -33,6 +34,7 @@ import ( var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{} var scope = promutils.NewTestScope() var propellerTestMetrics = newPropellerMetrics(scope) +var config = runtime.NewConfigurationProvider().NamespaceMappingConfiguration() var roleNameKey = "iam.amazonaws.com/role" var clusterName = "C1" @@ -45,6 +47,7 @@ func getFlytePropellerForTest(execCluster interfaces2.ClusterInterface, builder builder: builder, roleNameKey: roleNameKey, metrics: propellerTestMetrics, + config: config, } }