Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add default execution label as fallback if no other label matches
Browse files Browse the repository at this point in the history
Signed-off-by: sgref <[email protected]>
  • Loading branch information
sgref committed Sep 30, 2022
1 parent b962ed7 commit 8d13d5f
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 12 deletions.
19 changes: 17 additions & 2 deletions pkg/executioncluster/impl/random_cluster_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type RandomClusterSelector struct {
equalWeightedAllClusters random.WeightedRandomList
labelWeightedRandomMap map[string]random.WeightedRandomList
resourceManager managerInterfaces.ResourceInterface
defaultExecutionLabel string
}

func getRandSource(seed string) (rand.Source, error) {
Expand Down Expand Up @@ -111,6 +112,7 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu
return nil, err
}
}

var weightedRandomList random.WeightedRandomList
if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil {
label := resource.Attributes.GetExecutionClusterLabel().Value
Expand All @@ -123,8 +125,17 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu
} else {
logger.Debugf(ctx, "No override found for the spec %v", spec)
}
// If there is no label associated (or) if the label is invalid, choose from all enabled clusters.
// Note that if there is a valid label with zero "Enabled" clusters, we still choose from all enabled ones.

if weightedRandomList == nil {
if s.defaultExecutionLabel != "" {
if _, ok := s.labelWeightedRandomMap[s.defaultExecutionLabel]; ok {
weightedRandomList = s.labelWeightedRandomMap[s.defaultExecutionLabel]
} else {
logger.Warnf(ctx, "No cluster mapping found for the default execution label %s", s.defaultExecutionLabel)
}
}
}

if weightedRandomList == nil {
weightedRandomList = s.equalWeightedAllClusters
}
Expand All @@ -148,6 +159,9 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu

func NewRandomClusterSelector(listTargets interfaces.ListTargetsInterface, config runtime.Configuration,
db repositoryInterfaces.Repository) (interfaces.ClusterInterface, error) {

defaultExecutionLabel := config.ClusterConfiguration().GetDefaultExecutionLabel()

equalWeightedAllClusters, err := convertToRandomWeightedList(context.Background(), listTargets.GetValidTargets())
if err != nil {
return nil, err
Expand All @@ -161,5 +175,6 @@ func NewRandomClusterSelector(listTargets interfaces.ListTargetsInterface, confi
resourceManager: resources.NewResourceManager(db, config.ApplicationConfiguration()),
equalWeightedAllClusters: equalWeightedAllClusters,
ListTargetsInterface: listTargets,
defaultExecutionLabel: defaultExecutionLabel,
}, nil
}
125 changes: 117 additions & 8 deletions pkg/executioncluster/impl/random_cluster_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"github.com/stretchr/testify/assert"
)

const testProject = "project"
const testDomain = "domain"
const testWorkflow = "name"

const (
testCluster1 = "testcluster1"
testCluster2 = "testcluster2"
testCluster3 = "testcluster3"
testProject = "project"
testDomain = "domain"
testWorkflow = "name"
testCluster1 = "testcluster1"
testCluster2 = "testcluster2"
testCluster3 = "testcluster3"
clusterConfig1 = "clusters_config.yaml"
clusterConfig2 = "clusters_config2.yaml"
clusterConfig2WithDefaultLabel = "clusters_config2_default_label.yaml"
)

func initTestConfig(fileName string) error {
Expand All @@ -47,7 +49,7 @@ func initTestConfig(fileName string) error {
}

func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface {
err := initTestConfig("clusters_config.yaml")
err := initTestConfig(clusterConfig1)
assert.NoError(t, err)

db := repo_mock.NewMockRepository()
Expand Down Expand Up @@ -119,6 +121,74 @@ func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface
return randomCluster
}

func getRandomClusterSelectorWithDefaultLabelForTest(t *testing.T, configFile string) interfaces2.ClusterInterface {
err := initTestConfig(configFile)
assert.NoError(t, err)

db := repo_mock.NewMockRepository()
db.ResourceRepo().(*repo_mock.MockResourceRepo).GetFunction = func(ctx context.Context, ID repo_interface.ResourceID) (resource models.Resource, e error) {
assert.Equal(t, "EXECUTION_CLUSTER_LABEL", ID.ResourceType)
if ID.Project == "" {
return models.Resource{}, errors.NewFlyteAdminErrorf(codes.NotFound,
"Resource [%+v] not found", ID)
}
response := models.Resource{
Project: ID.Project,
Domain: ID.Domain,
Workflow: ID.Workflow,
ResourceType: ID.ResourceType,
LaunchPlan: ID.LaunchPlan,
}
if ID.Project == testProject && ID.Domain == testDomain {
matchingAttributes := &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ExecutionClusterLabel{
ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "two",
},
},
}
marshalledMatchingAttributes, _ := proto.Marshal(matchingAttributes)
response.Attributes = marshalledMatchingAttributes
}
return response, nil
}
configProvider := runtime.NewConfigurationProvider()
listTargetsProvider := mocks.ListTargetsInterface{}
validTargets := map[string]*executioncluster.ExecutionTarget{
testCluster1: {
ID: testCluster1,
Enabled: true,
},
testCluster2: {
ID: testCluster2,
Enabled: true,
},
testCluster3: {
ID: testCluster3,
Enabled: true,
},
}
targets := map[string]*executioncluster.ExecutionTarget{
testCluster1: {
ID: testCluster1,
Enabled: true,
},
testCluster2: {
ID: testCluster2,
Enabled: true,
},
testCluster3: {
ID: testCluster3,
Enabled: true,
},
}
listTargetsProvider.OnGetValidTargets().Return(validTargets)
listTargetsProvider.OnGetAllTargets().Return(targets)
randomCluster, err := NewRandomClusterSelector(&listTargetsProvider, configProvider, db)
assert.NoError(t, err)
return randomCluster
}

func TestRandomClusterSelectorGetTarget(t *testing.T) {
cluster := getRandomClusterSelectorForTest(t)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{TargetID: testCluster1})
Expand Down Expand Up @@ -198,3 +268,42 @@ func TestRandomClusterSelectorGetAllValidTargets(t *testing.T) {
targets := cluster.GetValidTargets()
assert.Equal(t, 2, len(targets))
}

func TestRandomClusterSelectorGetTargetWithFallbackToDefault1(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
})
assert.Nil(t, err)
assert.Equal(t, testCluster3, target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithFallbackToDefault2(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: testDomain,
Workflow: testWorkflow,
ExecutionID: "e3",
})
assert.Nil(t, err)
assert.Equal(t, testCluster2, target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithFallbackToDefault3(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2WithDefaultLabel)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
})
assert.Nil(t, err)
assert.Equal(t, testCluster1, target.ID)
assert.True(t, target.Enabled)
}
34 changes: 34 additions & 0 deletions pkg/executioncluster/impl/testdata/clusters_config2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
clusters:
labelClusterMap:
one:
- id: testcluster1
weight: 1
two:
- id: testcluster2
weight: 1
three:
- id: testcluster3
weight: 1
clusterConfigs:
- name: "testcluster1"
endpoint: "testcluster1_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster1/token"
certPath: "/path/to/testcluster1/cert"
- name: "testcluster2"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"
- name: "testcluster3"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
clusters:
defaultExecutionLabel: one
labelClusterMap:
one:
- id: testcluster1
weight: 1
two:
- id: testcluster2
weight: 1
three:
- id: testcluster3
weight: 1
clusterConfigs:
- name: "testcluster1"
endpoint: "testcluster1_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster1/token"
certPath: "/path/to/testcluster1/cert"
- name: "testcluster2"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"
- name: "testcluster3"
endpoint: "testcluster2_endpoint"
enabled: true
auth:
type: "file_path"
tokenPath: "/path/to/testcluster2/token"
certPath: "/path/to/testcluster2/cert"

9 changes: 9 additions & 0 deletions pkg/runtime/cluster_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func (p *ClusterConfigurationProvider) GetClusterConfigs() []interfaces.ClusterC
return make([]interfaces.ClusterConfig, 0)
}

func (p *ClusterConfigurationProvider) GetDefaultExecutionLabel() string {
if clusterConfig != nil {
clusters := clusterConfig.GetConfig().(*interfaces.Clusters)
return clusters.DefaultExecutionLabel
}
logger.Debug(context.Background(), "Failed to find default execution label in config. Will use random cluster if no execution label matches.")
return ""
}

func NewClusterConfigurationProvider() interfaces.ClusterConfiguration {
clusterConfigProvider := ClusterConfigurationProvider{}
clusterNameMap := make(map[string]bool)
Expand Down
8 changes: 6 additions & 2 deletions pkg/runtime/interfaces/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func (auth Auth) GetToken() (string, error) {
}

type Clusters struct {
ClusterConfigs []ClusterConfig `json:"clusterConfigs"`
LabelClusterMap map[string][]ClusterEntity `json:"labelClusterMap"`
ClusterConfigs []ClusterConfig `json:"clusterConfigs"`
LabelClusterMap map[string][]ClusterEntity `json:"labelClusterMap"`
DefaultExecutionLabel string `json:"defaultExecutionLabel"`
}

//go:generate mockery -name ClusterConfiguration -case=underscore -output=../mocks -case=underscore
Expand All @@ -56,4 +57,7 @@ type ClusterConfiguration interface {

// Returns label cluster map for routing
GetLabelClusterMap() map[string][]ClusterEntity

// Returns default execution label used as fallback if no execution cluster was explicitly defined.
GetDefaultExecutionLabel() string
}
32 changes: 32 additions & 0 deletions pkg/runtime/mocks/cluster_configuration.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8d13d5f

Please sign in to comment.