Skip to content

Commit

Permalink
Add Namespace Mapping Configuration (#3)
Browse files Browse the repository at this point in the history
* add namepace configuration

* add namespace files

* remove unused var

* update propeller executor config

* use propeller executor configuration

* to lowercase

* fix conflicts

* update namespace test

* remove namespaceMapping variable

* fix compile issues

* add project-domain option and remove incorrect log messages

* upd mock configuration provider

* update unit tests

* Merge logs on task execution event updates (#18)

* use fallthrough

* Correct Lint Errors and Add Documentation on Pre-Commit (#19)

* README update

* Fix lint errors

* add namepace configuration

* add namespace files

* remove unused var

* update propeller executor config

* use propeller executor configuration

* to lowercase

* fix conflicts

* update namespace test

* remove namespaceMapping variable

* fix compile issues

* add project-domain option and remove incorrect log messages

* upd mock configuration provider

* update unit tests

* use fallthrough

* fix conflicts

* fix more conflicts

* lint

* remove duplicate package

* fix lint errors
  • Loading branch information
migueltol22 authored Oct 22, 2019
1 parent b1012f4 commit 5ba9e89
Show file tree
Hide file tree
Showing 19 changed files with 254 additions and 30 deletions.
103 changes: 102 additions & 1 deletion flyteadmin/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions flyteadmin/pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
)

const namespaceFormat = "%s-%s"
const namespaceVariable = "namespace"
const templateVariableFormat = "{{ %s }}"
const replaceAllInstancesOfString = -1
Expand Down Expand Up @@ -283,7 +282,7 @@ func (c *controller) Sync(ctx context.Context) error {
var errs = make([]error, 0)
for _, project := range projects {
for _, domain := range *domains {
namespace := fmt.Sprintf(namespaceFormat, project.Identifier, domain.Name)
namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceMappingConfig(), project.Identifier, domain.Name)
err := c.syncNamespace(ctx, namespace)
if err != nil {
logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err)
Expand Down
24 changes: 24 additions & 0 deletions flyteadmin/pkg/common/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import "fmt"

type NamespaceMapping int

const namespaceFormat = "%s-%s"

const (
ProjectDomain NamespaceMapping = iota
Domain NamespaceMapping = iota
)

// GetNamespaceName returns kubernetes namespace name
func GetNamespaceName(mapping NamespaceMapping, project, domain string) string {
switch mapping {
case Domain:
return domain
case ProjectDomain:
fallthrough
default:
return fmt.Sprintf(namespaceFormat, project, domain)
}
}
25 changes: 25 additions & 0 deletions flyteadmin/pkg/common/namespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetNamespaceName(t *testing.T) {
testCases := []struct {
mapping NamespaceMapping
project string
domain string
want string
}{
{ProjectDomain, "project", "production", "project-production"},
{20 /*Dummy enum value that is not supported*/, "project", "development", "project-development"},
{Domain, "project", "production", "production"},
}

for _, tc := range testCases {
got := GetNamespaceName(tc.mapping, tc.project, tc.domain)
assert.Equal(t, got, tc.want)
}
}
8 changes: 4 additions & 4 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func getMockExecutionsConfigProvider() runtimeInterfaces.Configuration {
testutils.GetApplicationConfigWithDefaultProjects(),
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil, nil, nil)
nil, nil, nil, nil)
mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())
return mockExecutionsConfigProvider
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
Tags: []string{"tag"},
},
}),
nil, nil, nil)
nil, nil, nil, nil)
configProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())
mockExecutor := workflowengineMocks.NewMockExecutor()
Expand Down Expand Up @@ -1548,7 +1548,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) {
&mockApplicationConfig,
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil, nil, nil)
nil, nil, nil, nil)

var myExecManager = &ExecutionManager{
db: repository,
Expand Down Expand Up @@ -1681,7 +1681,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro
&mockApplicationConfig,
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil, nil, nil)
nil, nil, nil, nil)

var myExecManager = &ExecutionManager{
db: repository,
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/manager/impl/executions/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestGetQueue(t *testing.T) {
}
queueAllocator := NewQueueAllocator(runtimeMocks.NewMockConfigurationProvider(
nil, runtimeMocks.NewMockQueueConfigurationProvider(executionQueues, workflowConfigs),
nil, nil, nil))
nil, nil, nil, nil))
queueConfig := singleQueueConfiguration{
PrimaryQueue: "queue primary",
DynamicQueue: "queue dynamic",
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestGetQueueDefaults(t *testing.T) {
}
queueAllocator := NewQueueAllocator(runtimeMocks.NewMockConfigurationProvider(
nil, runtimeMocks.NewMockQueueConfigurationProvider(executionQueues, workflowConfigs), nil,
nil, nil))
nil, nil, nil))
assert.Equal(t, singleQueueConfiguration{
PrimaryQueue: "default primary",
DynamicQueue: "default dynamic",
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/launch_plan_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func getMockRepositoryForLpTest() repositories.RepositoryInterface {

func getMockConfigForLpTest() runtimeInterfaces.Configuration {
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil)
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil, nil)
return mockConfig
}

Expand Down
6 changes: 3 additions & 3 deletions flyteadmin/pkg/manager/impl/project_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var mockProjectConfigProvider = runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil)
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil, nil)

var testDomainsForProjManager = []string{"domain", "development", "staging", "production"}

Expand Down Expand Up @@ -80,7 +80,7 @@ func TestProjectManager_CreateProject(t *testing.T) {
}
projectManager := NewProjectManager(mockRepository,
runtimeMocks.NewMockConfigurationProvider(
getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil))
getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil, nil))
_, err := projectManager.CreateProject(context.Background(), admin.ProjectRegisterRequest{
Project: &admin.Project{
Id: "flyte-project-id",
Expand All @@ -100,7 +100,7 @@ func TestProjectManager_CreateProjectError(t *testing.T) {
}
projectManager := NewProjectManager(mockRepository,
runtimeMocks.NewMockConfigurationProvider(
getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil))
getMockApplicationConfigForProjectManagerTest(), nil, nil, nil, nil, nil))
_, err := projectManager.CreateProject(context.Background(), admin.ProjectRegisterRequest{
Project: &admin.Project{
Id: "flyte-project-id",
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const limit = 100
func getMockConfigForTaskTest() runtimeInterfaces.Configuration {
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, runtimeMocks.NewMockTaskResourceConfiguration(
runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}), runtimeMocks.NewMockWhitelistConfiguration())
runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}), runtimeMocks.NewMockWhitelistConfiguration(), nil)
return mockConfig
}

Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var workflowClosureBytes, _ = proto.Marshal(&workflowClosure)

func getMockWorkflowConfigProvider() runtimeInterfaces.Configuration {
mockWorkflowConfigProvider := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil)
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil, nil)
mockWorkflowConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())
return mockWorkflowConfigProvider
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
workflowExecutor := workflowengine.NewFlytePropeller(
applicationConfiguration.RoleNameKey,
executionCluster,
adminScope.NewSubScope("executor").NewSubScope("flytepropeller"))
adminScope.NewSubScope("executor").NewSubScope("flytepropeller"),
configuration.NamespaceMappingConfiguration())
logger.Info(context.Background(), "Successfully created a workflow executor engine")
dataStorageClient, err := storage.NewDataStore(storeConfig, adminScope.NewSubScope("storage"))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/runtime/cluster_resource_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ func (p *ClusterResourceConfigurationProvider) GetRefreshInterval() time.Duratio
return time.Minute
}

func NewNamespaceConfigurationProvider() interfaces.ClusterResourceConfiguration {
func NewClusterResourceConfigurationProvider() interfaces.ClusterResourceConfiguration {
return &ClusterResourceConfigurationProvider{}
}
8 changes: 7 additions & 1 deletion flyteadmin/pkg/runtime/configuration_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ConfigurationProvider struct {
whitelistConfiguration interfaces.WhitelistConfiguration
registrationValidationConfiguration interfaces.RegistrationValidationConfiguration
clusterResourceConfiguration interfaces.ClusterResourceConfiguration
namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration
}

func (p *ConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration {
Expand Down Expand Up @@ -43,6 +44,10 @@ func (p *ConfigurationProvider) ClusterResourceConfiguration() interfaces.Cluste
return p.clusterResourceConfiguration
}

func (p *ConfigurationProvider) NamespaceMappingConfiguration() interfaces.NamespaceMappingConfiguration {
return p.namespaceMappingConfiguration
}

func NewConfigurationProvider() interfaces.Configuration {
return &ConfigurationProvider{
applicationConfiguration: NewApplicationConfigurationProvider(),
Expand All @@ -51,6 +56,7 @@ func NewConfigurationProvider() interfaces.Configuration {
taskResourceConfiguration: NewTaskResourceProvider(),
whitelistConfiguration: NewWhitelistConfigurationProvider(),
registrationValidationConfiguration: NewRegistrationValidationProvider(),
clusterResourceConfiguration: NewNamespaceConfigurationProvider(),
clusterResourceConfiguration: NewClusterResourceConfigurationProvider(),
namespaceMappingConfiguration: NewNamespaceMappingConfigurationProvider(),
}
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/runtime/interfaces/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ type Configuration interface {
WhitelistConfiguration() WhitelistConfiguration
RegistrationValidationConfiguration() RegistrationValidationConfiguration
ClusterResourceConfiguration() ClusterResourceConfiguration
NamespaceMappingConfiguration() NamespaceMappingConfiguration
}
11 changes: 11 additions & 0 deletions flyteadmin/pkg/runtime/interfaces/namespace_configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package interfaces

import "github.com/lyft/flyteadmin/pkg/common"

type NamespaceMappingConfig struct {
Mapping string `json:"mapping"`
}

type NamespaceMappingConfiguration interface {
GetNamespaceMappingConfig() common.NamespaceMapping
}
23 changes: 17 additions & 6 deletions flyteadmin/pkg/runtime/mocks/mock_configuration_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type MockConfigurationProvider struct {
whitelistConfiguration interfaces.WhitelistConfiguration
registrationValidationConfiguration interfaces.RegistrationValidationConfiguration
clusterResourceConfiguration interfaces.ClusterResourceConfiguration
namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration
}

func (p *MockConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration {
Expand Down Expand Up @@ -48,17 +49,27 @@ func (p *MockConfigurationProvider) AddClusterResourceConfiguration(config inter
p.clusterResourceConfiguration = config
}

func (p *MockConfigurationProvider) NamespaceMappingConfiguration() interfaces.NamespaceMappingConfiguration {
return p.namespaceMappingConfiguration
}

func (p *MockConfigurationProvider) AddNamespaceMappingConfiguration(config interfaces.NamespaceMappingConfiguration) {
p.namespaceMappingConfiguration = config
}

func NewMockConfigurationProvider(
applicationConfiguration interfaces.ApplicationConfiguration,
queueConfiguration interfaces.QueueConfiguration,
clusterConfiguration interfaces.ClusterConfiguration,
taskResourceConfiguration interfaces.TaskResourceConfiguration,
whitelistConfiguration interfaces.WhitelistConfiguration) interfaces.Configuration {
whitelistConfiguration interfaces.WhitelistConfiguration,
namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration) interfaces.Configuration {
return &MockConfigurationProvider{
applicationConfiguration: applicationConfiguration,
queueConfiguration: queueConfiguration,
clusterConfiguration: clusterConfiguration,
taskResourceConfiguration: taskResourceConfiguration,
whitelistConfiguration: whitelistConfiguration,
applicationConfiguration: applicationConfiguration,
queueConfiguration: queueConfiguration,
clusterConfiguration: clusterConfiguration,
taskResourceConfiguration: taskResourceConfiguration,
whitelistConfiguration: whitelistConfiguration,
namespaceMappingConfiguration: namespaceMappingConfiguration,
}
}
Loading

0 comments on commit 5ba9e89

Please sign in to comment.