From 55f10312f9aa8e56b10c41f830e0fd6912c8293e Mon Sep 17 00:00:00 2001 From: Zhang Jinghui Date: Thu, 5 Dec 2019 22:15:23 +0800 Subject: [PATCH] add arguments for action --- .../actions/allocate/allocate_test.go | 2 +- pkg/scheduler/actions/enqueue/enqueue.go | 24 +++- pkg/scheduler/actions/enqueue/enqueue_test.go | 53 +++++++++ pkg/scheduler/actions/preempt/preempt_test.go | 2 +- pkg/scheduler/actions/reclaim/reclaim_test.go | 2 +- pkg/scheduler/conf/scheduler_conf.go | 10 ++ pkg/scheduler/framework/arguments.go | 37 +++++- pkg/scheduler/framework/arguments_test.go | 111 ++++++++++++++++++ pkg/scheduler/framework/framework.go | 3 +- pkg/scheduler/framework/session.go | 5 +- pkg/scheduler/plugins/binpack/binpack_test.go | 2 +- pkg/scheduler/scheduler.go | 5 +- pkg/scheduler/util.go | 8 +- pkg/scheduler/util_test.go | 21 +++- 14 files changed, 267 insertions(+), 18 deletions(-) create mode 100644 pkg/scheduler/actions/enqueue/enqueue_test.go diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index 63c173ef55..0a74be5e5e 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -202,7 +202,7 @@ func TestAllocate(t *testing.T) { }, }, }, - }) + }, nil) defer framework.CloseSession(ssn) allocate.Execute(ssn) diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index 11c7765a79..8008f5ba47 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -25,6 +25,18 @@ import ( "volcano.sh/volcano/pkg/scheduler/util" ) +const ( + // overCommitFactor is resource overCommit factor for enqueue action + // It determines the number of `pending` pods that the scheduler will tolerate + // when the resources of the cluster is insufficient + overCommitFactor = "overcommit-factor" +) + +var ( + // defaultOverCommitFactor defines the default overCommit resource factor for enqueue action + defaultOverCommitFactor = 1.2 +) + type enqueueAction struct { ssn *framework.Session } @@ -77,7 +89,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { emptyRes := api.EmptyResource() nodesIdleRes := api.EmptyResource() for _, node := range ssn.Nodes { - nodesIdleRes.Add(node.Allocatable.Clone().Multi(1.2).Sub(node.Used)) + nodesIdleRes.Add(node.Allocatable.Clone().Multi(enqueue.getOverCommitFactor(ssn)).Sub(node.Used)) } for { @@ -122,3 +134,13 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { } func (enqueue *enqueueAction) UnInitialize() {} + +func (enqueue *enqueueAction) getOverCommitFactor(ssn *framework.Session) float64 { + factor := defaultOverCommitFactor + arg := framework.GetArgOfActionFromConf(ssn.Configurations, enqueue.Name()) + if arg != nil { + arg.GetFloat64(&factor, overCommitFactor) + } + + return factor +} diff --git a/pkg/scheduler/actions/enqueue/enqueue_test.go b/pkg/scheduler/actions/enqueue/enqueue_test.go new file mode 100644 index 0000000000..5c3787e192 --- /dev/null +++ b/pkg/scheduler/actions/enqueue/enqueue_test.go @@ -0,0 +1,53 @@ +package enqueue + +import ( + "testing" + + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" +) + +func TestGetOverCommitFactor(t *testing.T) { + cases := []struct { + name string + ssn *framework.Session + expectedValue float64 + }{ + { + name: "arguments of action not exist", + ssn: &framework.Session{ + Configurations: []conf.Configuration{ + { + Name: "allocate", + Arguments: map[string]string{ + "placeholder": "placeholder", + }, + }, + }, + }, + expectedValue: 1.2, + }, + { + name: "arguments of action exist", + ssn: &framework.Session{ + Configurations: []conf.Configuration{ + { + Name: "enqueue", + Arguments: map[string]string{ + overCommitFactor: "2", + }, + }, + }, + }, + expectedValue: 2, + }, + } + + enqueue := New() + for index, c := range cases { + factor := enqueue.getOverCommitFactor(c.ssn) + if factor != c.expectedValue { + t.Errorf("index %d, case %s, expected %v, but got %v", index, c.name, c.expectedValue, factor) + } + } +} diff --git a/pkg/scheduler/actions/preempt/preempt_test.go b/pkg/scheduler/actions/preempt/preempt_test.go index 99c02023e4..f8be6703d8 100644 --- a/pkg/scheduler/actions/preempt/preempt_test.go +++ b/pkg/scheduler/actions/preempt/preempt_test.go @@ -282,7 +282,7 @@ func TestPreempt(t *testing.T) { }, }, }, - }) + }, nil) defer framework.CloseSession(ssn) preempt.Execute(ssn) diff --git a/pkg/scheduler/actions/reclaim/reclaim_test.go b/pkg/scheduler/actions/reclaim/reclaim_test.go index ac63dda3fc..1d32721f05 100644 --- a/pkg/scheduler/actions/reclaim/reclaim_test.go +++ b/pkg/scheduler/actions/reclaim/reclaim_test.go @@ -150,7 +150,7 @@ func TestReclaim(t *testing.T) { }, }, }, - }) + }, nil) defer framework.CloseSession(ssn) reclaim.Execute(ssn) diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index a45c6e4697..55378fd77f 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -22,6 +22,8 @@ type SchedulerConfiguration struct { Actions string `yaml:"actions"` // Tiers defines plugins in different tiers Tiers []Tier `yaml:"tiers"` + // Configurations is configuration for actions + Configurations []Configuration `yaml:"configurations"` } // Tier defines plugin tier @@ -29,6 +31,14 @@ type Tier struct { Plugins []PluginOption `yaml:"plugins"` } +// Configuration is configuration of action +type Configuration struct { + // Name is name of action + Name string `yaml:"name"` + // Arguments defines the different arguments that can be given to specified action + Arguments map[string]string `yaml:"arguments"` +} + // PluginOption defines the options of plugin type PluginOption struct { // The name of Plugin diff --git a/pkg/scheduler/framework/arguments.go b/pkg/scheduler/framework/arguments.go index b94462e4e2..be31cc7805 100644 --- a/pkg/scheduler/framework/arguments.go +++ b/pkg/scheduler/framework/arguments.go @@ -19,13 +19,15 @@ package framework import ( "strconv" + "volcano.sh/volcano/pkg/scheduler/conf" + "k8s.io/klog" ) // Arguments map type Arguments map[string]string -//GetInt get the integer value from string +// GetInt get the integer value from string func (a Arguments) GetInt(ptr *int, key string) { if ptr == nil { return @@ -45,7 +47,27 @@ func (a Arguments) GetInt(ptr *int, key string) { *ptr = value } -//GetBool get the bool value from string +// GetFloat64 get the float64 value from string +func (a Arguments) GetFloat64(ptr *float64, key string) { + if ptr == nil { + return + } + + argv, ok := a[key] + if !ok || len(argv) == 0 { + return + } + + value, err := strconv.ParseFloat(argv, 64) + if err != nil { + klog.Warningf("Could not parse argument: %s for key %s, with err %v", argv, key, err) + return + } + + *ptr = value +} + +// GetBool get the bool value from string func (a Arguments) GetBool(ptr *bool, key string) { if ptr == nil { return @@ -64,3 +86,14 @@ func (a Arguments) GetBool(ptr *bool, key string) { *ptr = value } + +// GetArgOfActionFromConf return argument of action reading from configuration of schedule +func GetArgOfActionFromConf(configurations []conf.Configuration, actionName string) Arguments { + for _, c := range configurations { + if c.Name == actionName { + return c.Arguments + } + } + + return nil +} diff --git a/pkg/scheduler/framework/arguments_test.go b/pkg/scheduler/framework/arguments_test.go index f9d5f30a03..44559363df 100644 --- a/pkg/scheduler/framework/arguments_test.go +++ b/pkg/scheduler/framework/arguments_test.go @@ -17,7 +17,10 @@ limitations under the License. package framework import ( + "reflect" "testing" + + "volcano.sh/volcano/pkg/scheduler/conf" ) type GetIntTestCases struct { @@ -74,3 +77,111 @@ func TestArgumentsGetInt(t *testing.T) { } } } + +func TestArgumentsGetFloat64(t *testing.T) { + key1 := "float64key" + + cases := []struct { + name string + arg Arguments + key string + baseValue float64 + expectValue float64 + }{ + { + name: "key not exist", + arg: Arguments{ + "anotherKey": "12", + }, + key: key1, + baseValue: 1.2, + expectValue: 1.2, + }, + { + name: "key exist", + arg: Arguments{ + key1: "1.5", + }, + key: key1, + baseValue: 1.2, + expectValue: 1.5, + }, + { + name: "value of key invalid", + arg: Arguments{ + key1: "errorValue", + }, + key: key1, + baseValue: 1.2, + expectValue: 1.2, + }, + { + name: "value of key null", + arg: Arguments{ + key1: "", + }, + key: key1, + baseValue: 1.2, + expectValue: 1.2, + }, + } + + for index, c := range cases { + baseValue := c.baseValue + c.arg.GetFloat64(&baseValue, c.key) + if baseValue != c.expectValue { + t.Errorf("index %d, case %s, value should be %v, but not %v", index, c.name, c.expectValue, baseValue) + } + } +} + +func TestGetArgOfActionFromConf(t *testing.T) { + cases := []struct { + name string + configurations []conf.Configuration + action string + expectedArguments Arguments + }{ + { + name: "action exist in configurations", + configurations: []conf.Configuration{ + { + Name: "enqueue", + Arguments: map[string]string{ + "overCommitFactor": "1.5", + }, + }, + { + Name: "allocate", + Arguments: map[string]string{ + "placeholde": "placeholde", + }, + }, + }, + action: "enqueue", + expectedArguments: map[string]string{ + "overCommitFactor": "1.5", + }, + }, + { + name: "action not exist in configurations", + configurations: []conf.Configuration{ + { + Name: "enqueue", + Arguments: map[string]string{ + "overCommitFactor": "1.5", + }, + }, + }, + action: "allocate", + expectedArguments: nil, + }, + } + + for index, c := range cases { + arg := GetArgOfActionFromConf(c.configurations, c.action) + if false == reflect.DeepEqual(arg, c.expectedArguments) { + t.Errorf("index %d, case %s,expected %v, but got %v", index, c.name, c.expectedArguments, arg) + } + } +} diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index dfe4a43234..f66275769e 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -27,9 +27,10 @@ import ( ) // OpenSession start the session -func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session { +func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session { ssn := openSession(cache) ssn.Tiers = tiers + ssn.Configurations = configurations for _, tier := range tiers { for _, plugin := range tier.Plugins { diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 752ed38806..b74c48781a 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -45,8 +45,9 @@ type Session struct { Queues map[api.QueueID]*api.QueueInfo NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo - Backlog []*api.JobInfo - Tiers []conf.Tier + Backlog []*api.JobInfo + Tiers []conf.Tier + Configurations []conf.Configuration plugins map[string]Plugin eventHandlers []*EventHandler diff --git a/pkg/scheduler/plugins/binpack/binpack_test.go b/pkg/scheduler/plugins/binpack/binpack_test.go index 1253793c41..bcc80767ba 100644 --- a/pkg/scheduler/plugins/binpack/binpack_test.go +++ b/pkg/scheduler/plugins/binpack/binpack_test.go @@ -269,7 +269,7 @@ func TestNode(t *testing.T) { }, }, }, - }) + }, nil) defer framework.CloseSession(ssn) for _, job := range ssn.Jobs { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 4808f39c43..56c49b3869 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -36,6 +36,7 @@ type Scheduler struct { config *rest.Config actions []framework.Action plugins []conf.Tier + configurations []conf.Configuration schedulerConf string schedulePeriod time.Duration } @@ -75,7 +76,7 @@ func (pc *Scheduler) runOnce() { pc.loadSchedulerConf() - ssn := framework.OpenSession(pc.cache, pc.plugins) + ssn := framework.OpenSession(pc.cache, pc.plugins, pc.configurations) defer framework.CloseSession(ssn) for _, action := range pc.actions { @@ -98,7 +99,7 @@ func (pc *Scheduler) loadSchedulerConf() { } } - pc.actions, pc.plugins, err = loadSchedulerConf(schedConf) + pc.actions, pc.plugins, pc.configurations, err = loadSchedulerConf(schedConf) if err != nil { panic(err) } diff --git a/pkg/scheduler/util.go b/pkg/scheduler/util.go index 5a57906606..a831414c33 100644 --- a/pkg/scheduler/util.go +++ b/pkg/scheduler/util.go @@ -41,7 +41,7 @@ tiers: - name: nodeorder ` -func loadSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, error) { +func loadSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, []conf.Configuration, error) { var actions []framework.Action schedulerConf := &conf.SchedulerConfiguration{} @@ -50,7 +50,7 @@ func loadSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, error) copy(buf, confStr) if err := yaml.Unmarshal(buf, schedulerConf); err != nil { - return nil, nil, err + return nil, nil, nil, err } // Set default settings for each plugin if not set @@ -65,11 +65,11 @@ func loadSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, error) if action, found := framework.GetAction(strings.TrimSpace(actionName)); found { actions = append(actions, action) } else { - return nil, nil, fmt.Errorf("failed to found Action %s, ignore it", actionName) + return nil, nil, nil, fmt.Errorf("failed to found Action %s, ignore it", actionName) } } - return actions, schedulerConf.Tiers, nil + return actions, schedulerConf.Tiers, schedulerConf.Configurations, nil } func readSchedulerConf(confPath string) (string, error) { diff --git a/pkg/scheduler/util_test.go b/pkg/scheduler/util_test.go index cd3829c9b4..a06e077b4b 100644 --- a/pkg/scheduler/util_test.go +++ b/pkg/scheduler/util_test.go @@ -26,7 +26,11 @@ import ( func TestLoadSchedulerConf(t *testing.T) { configuration := ` -actions: "allocate, backfill" +actions: "enqueue, allocate, backfill" +configurations: +- name: enqueue + arguments: + "overcommit-factor": 1.5 tiers: - plugins: - name: priority @@ -142,7 +146,16 @@ tiers: }, } - _, tiers, err := loadSchedulerConf(configuration) + expectedConfigurations := []conf.Configuration{ + { + Name: "enqueue", + Arguments: map[string]string{ + "overcommit-factor": "1.5", + }, + }, + } + + _, tiers, configurations, err := loadSchedulerConf(configuration) if err != nil { t.Errorf("Failed to load scheduler configuration: %v", err) } @@ -150,4 +163,8 @@ tiers: t.Errorf("Failed to set default settings for plugins, expected: %+v, got %+v", expectedTiers, tiers) } + if !reflect.DeepEqual(configurations, expectedConfigurations) { + t.Errorf("Wrong configuration, expected: %+v, got %+v", + expectedConfigurations, configurations) + } }