Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

#minor Add default execution cluster option for a multi-cluster setup #477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions pkg/executioncluster/impl/random_cluster_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type RandomClusterSelector struct {
equalWeightedAllClusters random.WeightedRandomList
labelWeightedRandomMap map[string]random.WeightedRandomList
resourceManager managerInterfaces.ResourceInterface
defaultExecutionLabel string
}

func getRandSource(seed string) (rand.Source, error) {
Expand Down Expand Up @@ -111,6 +112,7 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu
return nil, err
}
}

var weightedRandomList random.WeightedRandomList
if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil {
label := resource.Attributes.GetExecutionClusterLabel().Value
Expand All @@ -123,8 +125,17 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu
} else {
logger.Debugf(ctx, "No override found for the spec %v", spec)
}
// If there is no label associated (or) if the label is invalid, choose from all enabled clusters.
// Note that if there is a valid label with zero "Enabled" clusters, we still choose from all enabled ones.

if weightedRandomList == nil {
if s.defaultExecutionLabel != "" {
if _, ok := s.labelWeightedRandomMap[s.defaultExecutionLabel]; ok {
weightedRandomList = s.labelWeightedRandomMap[s.defaultExecutionLabel]
} else {
logger.Warnf(ctx, "No cluster mapping found for the default execution label %s", s.defaultExecutionLabel)
}
}
}

if weightedRandomList == nil {
weightedRandomList = s.equalWeightedAllClusters
}
Expand All @@ -148,6 +159,9 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu

func NewRandomClusterSelector(listTargets interfaces.ListTargetsInterface, config runtime.Configuration,
db repositoryInterfaces.Repository) (interfaces.ClusterInterface, error) {

defaultExecutionLabel := config.ClusterConfiguration().GetDefaultExecutionLabel()

equalWeightedAllClusters, err := convertToRandomWeightedList(context.Background(), listTargets.GetValidTargets())
if err != nil {
return nil, err
Expand All @@ -161,5 +175,6 @@ func NewRandomClusterSelector(listTargets interfaces.ListTargetsInterface, confi
resourceManager: resources.NewResourceManager(db, config.ApplicationConfiguration()),
equalWeightedAllClusters: equalWeightedAllClusters,
ListTargetsInterface: listTargets,
defaultExecutionLabel: defaultExecutionLabel,
}, nil
}
125 changes: 117 additions & 8 deletions pkg/executioncluster/impl/random_cluster_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"github.com/stretchr/testify/assert"
)

const testProject = "project"
const testDomain = "domain"
const testWorkflow = "name"

const (
testCluster1 = "testcluster1"
testCluster2 = "testcluster2"
testCluster3 = "testcluster3"
testProject = "project"
testDomain = "domain"
testWorkflow = "name"
testCluster1 = "testcluster1"
testCluster2 = "testcluster2"
testCluster3 = "testcluster3"
clusterConfig1 = "clusters_config.yaml"
clusterConfig2 = "clusters_config2.yaml"
clusterConfig2WithDefaultLabel = "clusters_config2_default_label.yaml"
)

func initTestConfig(fileName string) error {
Expand All @@ -47,7 +49,7 @@ func initTestConfig(fileName string) error {
}

func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface {
err := initTestConfig("clusters_config.yaml")
err := initTestConfig(clusterConfig1)
assert.NoError(t, err)

db := repo_mock.NewMockRepository()
Expand Down Expand Up @@ -119,6 +121,74 @@ func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface
return randomCluster
}

func getRandomClusterSelectorWithDefaultLabelForTest(t *testing.T, configFile string) interfaces2.ClusterInterface {
err := initTestConfig(configFile)
assert.NoError(t, err)

db := repo_mock.NewMockRepository()
db.ResourceRepo().(*repo_mock.MockResourceRepo).GetFunction = func(ctx context.Context, ID repo_interface.ResourceID) (resource models.Resource, e error) {
assert.Equal(t, "EXECUTION_CLUSTER_LABEL", ID.ResourceType)
if ID.Project == "" {
return models.Resource{}, errors.NewFlyteAdminErrorf(codes.NotFound,
"Resource [%+v] not found", ID)
}
response := models.Resource{
Project: ID.Project,
Domain: ID.Domain,
Workflow: ID.Workflow,
ResourceType: ID.ResourceType,
LaunchPlan: ID.LaunchPlan,
}
if ID.Project == testProject && ID.Domain == testDomain {
matchingAttributes := &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ExecutionClusterLabel{
ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "two",
},
},
}
marshalledMatchingAttributes, _ := proto.Marshal(matchingAttributes)
response.Attributes = marshalledMatchingAttributes
}
return response, nil
}
configProvider := runtime.NewConfigurationProvider()
listTargetsProvider := mocks.ListTargetsInterface{}
validTargets := map[string]*executioncluster.ExecutionTarget{
testCluster1: {
ID: testCluster1,
Enabled: true,
},
testCluster2: {
ID: testCluster2,
Enabled: true,
},
testCluster3: {
ID: testCluster3,
Enabled: true,
},
}
targets := map[string]*executioncluster.ExecutionTarget{
testCluster1: {
ID: testCluster1,
Enabled: true,
},
testCluster2: {
ID: testCluster2,
Enabled: true,
},
testCluster3: {
ID: testCluster3,
Enabled: true,
},
}
listTargetsProvider.OnGetValidTargets().Return(validTargets)
listTargetsProvider.OnGetAllTargets().Return(targets)
randomCluster, err := NewRandomClusterSelector(&listTargetsProvider, configProvider, db)
assert.NoError(t, err)
return randomCluster
}

func TestRandomClusterSelectorGetTarget(t *testing.T) {
cluster := getRandomClusterSelectorForTest(t)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{TargetID: testCluster1})
Expand Down Expand Up @@ -198,3 +268,42 @@ func TestRandomClusterSelectorGetAllValidTargets(t *testing.T) {
targets := cluster.GetValidTargets()
assert.Equal(t, 2, len(targets))
}

func TestRandomClusterSelectorGetTargetWithFallbackToDefault1(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
})
assert.Nil(t, err)
assert.Equal(t, testCluster3, target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithFallbackToDefault2(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: testDomain,
Workflow: testWorkflow,
ExecutionID: "e3",
})
assert.Nil(t, err)
assert.Equal(t, testCluster2, target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithFallbackToDefault3(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2WithDefaultLabel)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
})
assert.Nil(t, err)
assert.Equal(t, testCluster1, target.ID)
assert.True(t, target.Enabled)
}
34 changes: 34 additions & 0 deletions pkg/executioncluster/impl/testdata/clusters_config2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
clusters:
labelClusterMap:
one:
- id: testcluster1
weight: 1
two:
- id: testcluster2
weight: 1
three:
- id: testcluster3
weight: 1
clusterConfigs:
- name: "testcluster1"
endpoint: "testcluster1_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster1/token"
certPath: "/path/to/testcluster1/cert"
- name: "testcluster2"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"
- name: "testcluster3"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
clusters:
defaultExecutionLabel: one
labelClusterMap:
one:
- id: testcluster1
weight: 1
two:
- id: testcluster2
weight: 1
three:
- id: testcluster3
weight: 1
clusterConfigs:
- name: "testcluster1"
endpoint: "testcluster1_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster1/token"
certPath: "/path/to/testcluster1/cert"
- name: "testcluster2"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"
- name: "testcluster3"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"

9 changes: 9 additions & 0 deletions pkg/runtime/cluster_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func (p *ClusterConfigurationProvider) GetClusterConfigs() []interfaces.ClusterC
return make([]interfaces.ClusterConfig, 0)
}

func (p *ClusterConfigurationProvider) GetDefaultExecutionLabel() string {
if clusterConfig != nil {
clusters := clusterConfig.GetConfig().(*interfaces.Clusters)
return clusters.DefaultExecutionLabel
}
logger.Debug(context.Background(), "Failed to find default execution label in config. Will use random cluster if no execution label matches.")
return ""
}

func NewClusterConfigurationProvider() interfaces.ClusterConfiguration {
clusterConfigProvider := ClusterConfigurationProvider{}
clusterNameMap := make(map[string]bool)
Expand Down
8 changes: 6 additions & 2 deletions pkg/runtime/interfaces/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func (auth Auth) GetToken() (string, error) {
}

type Clusters struct {
ClusterConfigs []ClusterConfig `json:"clusterConfigs"`
LabelClusterMap map[string][]ClusterEntity `json:"labelClusterMap"`
ClusterConfigs []ClusterConfig `json:"clusterConfigs"`
LabelClusterMap map[string][]ClusterEntity `json:"labelClusterMap"`
DefaultExecutionLabel string `json:"defaultExecutionLabel"`
}

//go:generate mockery -name ClusterConfiguration -case=underscore -output=../mocks -case=underscore
Expand All @@ -56,4 +57,7 @@ type ClusterConfiguration interface {

// Returns label cluster map for routing
GetLabelClusterMap() map[string][]ClusterEntity

// Returns default execution label used as fallback if no execution cluster was explicitly defined.
GetDefaultExecutionLabel() string
}
32 changes: 32 additions & 0 deletions pkg/runtime/mocks/cluster_configuration.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.