Skip to content

Commit

Permalink
add arguments for action
Browse files Browse the repository at this point in the history
  • Loading branch information
sivanzcw committed Dec 6, 2019
1 parent 92bf4be commit 6d06b2e
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestAllocate(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

allocate.Execute(ssn)
Expand Down
24 changes: 23 additions & 1 deletion pkg/scheduler/actions/enqueue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ import (
"volcano.sh/volcano/pkg/scheduler/util"
)

const (
// overCommitFactor is resource overCommit factor for enqueue action
// It determines the number of `pending` pods that the scheduler will tolerate
// when the resources of the cluster is insufficient
overCommitFactor = "overCommitFactor"
)

var (
// defaultOverCommitFactor defines the default overCommit resource factor for enqueue action
defaultOverCommitFactor = 1.2
)

type enqueueAction struct {
ssn *framework.Session
}
Expand Down Expand Up @@ -77,7 +89,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) {
emptyRes := api.EmptyResource()
nodesIdleRes := api.EmptyResource()
for _, node := range ssn.Nodes {
nodesIdleRes.Add(node.Allocatable.Clone().Multi(1.2).Sub(node.Used))
nodesIdleRes.Add(node.Allocatable.Clone().Multi(enqueue.getOverCommitFactor(ssn)).Sub(node.Used))
}

for {
Expand Down Expand Up @@ -122,3 +134,13 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) {
}

func (enqueue *enqueueAction) UnInitialize() {}

func (enqueue *enqueueAction) getOverCommitFactor(ssn *framework.Session) float64 {
factor := defaultOverCommitFactor
arg := framework.GetArgOfActionFromConf(ssn.Configurations, enqueue.Name())
if arg != nil {
arg.GetFloat64(&factor, overCommitFactor)
}

return factor
}
53 changes: 53 additions & 0 deletions pkg/scheduler/actions/enqueue/enqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package enqueue

import (
"testing"

"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
)

func TestGetOverCommitFactor(t *testing.T) {
cases := []struct {
name string
ssn *framework.Session
expectedValue float64
}{
{
name: "arguments of action not exist",
ssn: &framework.Session{
Configurations: []conf.Configuration{
{
Name: "allocate",
Arguments: map[string]string{
"placeholder": "placeholder",
},
},
},
},
expectedValue: 1.2,
},
{
name: "arguments of action exist",
ssn: &framework.Session{
Configurations: []conf.Configuration{
{
Name: "enqueue",
Arguments: map[string]string{
overCommitFactor: "2",
},
},
},
},
expectedValue: 2,
},
}

enqueue := New()
for index, c := range cases {
factor := enqueue.getOverCommitFactor(c.ssn)
if factor != c.expectedValue {
t.Errorf("index %d, case %s, expected %v, but got %v", index, c.name, c.expectedValue, factor)
}
}
}
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestPreempt(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

allocate.Execute(ssn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestReclaim(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

reclaim.Execute(ssn)
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,23 @@ type SchedulerConfiguration struct {
Actions string `yaml:"actions"`
// Tiers defines plugins in different tiers
Tiers []Tier `yaml:"tiers"`
// Configurations is configuration for actions
Configurations []Configuration `yaml:"configurations"`
}

// Tier defines plugin tier
type Tier struct {
Plugins []PluginOption `yaml:"plugins"`
}

// Configuration is configuration of action
type Configuration struct {
// Name is name of action
Name string `yaml:"name"`
// Arguments defines the different arguments that can be given to specified action
Arguments map[string]string `yaml:"arguments"`
}

// PluginOption defines the options of plugin
type PluginOption struct {
// The name of Plugin
Expand Down
37 changes: 35 additions & 2 deletions pkg/scheduler/framework/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package framework
import (
"strconv"

"volcano.sh/volcano/pkg/scheduler/conf"

"k8s.io/klog"
)

// Arguments map
type Arguments map[string]string

//GetInt get the integer value from string
// GetInt get the integer value from string
func (a Arguments) GetInt(ptr *int, key string) {
if ptr == nil {
return
Expand All @@ -45,7 +47,27 @@ func (a Arguments) GetInt(ptr *int, key string) {
*ptr = value
}

//GetBool get the bool value from string
// GetFloat64 get the float64 value from string
func (a Arguments) GetFloat64(ptr *float64, key string) {
if ptr == nil {
return
}

argv, ok := a[key]
if !ok || len(argv) == 0 {
return
}

value, err := strconv.ParseFloat(argv, 64)
if err != nil {
klog.Warningf("Could not parse argument: %s for key %s, with err %v", argv, key, err)
return
}

*ptr = value
}

// GetBool get the bool value from string
func (a Arguments) GetBool(ptr *bool, key string) {
if ptr == nil {
return
Expand All @@ -64,3 +86,14 @@ func (a Arguments) GetBool(ptr *bool, key string) {

*ptr = value
}

// GetArgOfActionFromConf return argument of action reading from configuration of schedule
func GetArgOfActionFromConf(configurations []conf.Configuration, actionName string) Arguments {
for _, c := range configurations {
if c.Name == actionName {
return c.Arguments
}
}

return nil
}
111 changes: 111 additions & 0 deletions pkg/scheduler/framework/arguments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package framework

import (
"reflect"
"testing"

"volcano.sh/volcano/pkg/scheduler/conf"
)

type GetIntTestCases struct {
Expand Down Expand Up @@ -74,3 +77,111 @@ func TestArgumentsGetInt(t *testing.T) {
}
}
}

func TestArgumentsGetFloat64(t *testing.T) {
key1 := "float64key"

cases := []struct {
name string
arg Arguments
key string
baseValue float64
expectValue float64
}{
{
name: "key not exist",
arg: Arguments{
"anotherKey": "12",
},
key: key1,
baseValue: 1.2,
expectValue: 1.2,
},
{
name: "key exist",
arg: Arguments{
key1: "1.5",
},
key: key1,
baseValue: 1.2,
expectValue: 1.5,
},
{
name: "value of key invalid",
arg: Arguments{
key1: "errorValue",
},
key: key1,
baseValue: 1.2,
expectValue: 1.2,
},
{
name: "value of key null",
arg: Arguments{
key1: "",
},
key: key1,
baseValue: 1.2,
expectValue: 1.2,
},
}

for index, c := range cases {
baseValue := c.baseValue
c.arg.GetFloat64(&baseValue, c.key)
if baseValue != c.expectValue {
t.Errorf("index %d, case %s, value should be %v, but not %v", index, c.name, c.expectValue, baseValue)
}
}
}

func TestGetArgOfActionFromConf(t *testing.T) {
cases := []struct {
name string
configurations []conf.Configuration
action string
expectedArguments Arguments
}{
{
name: "action exist in configurations",
configurations: []conf.Configuration{
{
Name: "enqueue",
Arguments: map[string]string{
"overCommitFactor": "1.5",
},
},
{
Name: "allocate",
Arguments: map[string]string{
"placeholde": "placeholde",
},
},
},
action: "enqueue",
expectedArguments: map[string]string{
"overCommitFactor": "1.5",
},
},
{
name: "action not exist in configurations",
configurations: []conf.Configuration{
{
Name: "enqueue",
Arguments: map[string]string{
"overCommitFactor": "1.5",
},
},
},
action: "allocate",
expectedArguments: nil,
},
}

for index, c := range cases {
arg := GetArgOfActionFromConf(c.configurations, c.action)
if false == reflect.DeepEqual(arg, c.expectedArguments) {
t.Errorf("index %d, case %s,expected %v, but got %v", index, c.name, c.expectedArguments, arg)
}
}
}
3 changes: 2 additions & 1 deletion pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
ssn := openSession(cache)
ssn.Tiers = tiers
ssn.Configurations = configurations

for _, tier := range tiers {
for _, plugin := range tier.Plugins {
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type Session struct {
Queues map[api.QueueID]*api.QueueInfo
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo

Backlog []*api.JobInfo
Tiers []conf.Tier
Backlog []*api.JobInfo
Tiers []conf.Tier
Configurations []conf.Configuration

plugins map[string]Plugin
eventHandlers []*EventHandler
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/binpack/binpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func TestNode(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

for _, job := range ssn.Jobs {
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Scheduler struct {
config *rest.Config
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
schedulerConf string
schedulePeriod time.Duration
}
Expand Down Expand Up @@ -75,7 +76,7 @@ func (pc *Scheduler) runOnce() {

pc.loadSchedulerConf()

ssn := framework.OpenSession(pc.cache, pc.plugins)
ssn := framework.OpenSession(pc.cache, pc.plugins, pc.configurations)
defer framework.CloseSession(ssn)

for _, action := range pc.actions {
Expand All @@ -98,7 +99,7 @@ func (pc *Scheduler) loadSchedulerConf() {
}
}

pc.actions, pc.plugins, err = loadSchedulerConf(schedConf)
pc.actions, pc.plugins, pc.configurations, err = loadSchedulerConf(schedConf)
if err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 6d06b2e

Please sign in to comment.