Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arguments for action #587

Merged
merged 1 commit into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestAllocate(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

allocate.Execute(ssn)
Expand Down
24 changes: 23 additions & 1 deletion pkg/scheduler/actions/enqueue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ import (
"volcano.sh/volcano/pkg/scheduler/util"
)

const (
// overCommitFactor is resource overCommit factor for enqueue action
// It determines the number of `pending` pods that the scheduler will tolerate
// when the resources of the cluster is insufficient
overCommitFactor = "overcommit-factor"
)

var (
// defaultOverCommitFactor defines the default overCommit resource factor for enqueue action
defaultOverCommitFactor = 1.2
)

type enqueueAction struct {
ssn *framework.Session
}
Expand Down Expand Up @@ -77,7 +89,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) {
emptyRes := api.EmptyResource()
nodesIdleRes := api.EmptyResource()
for _, node := range ssn.Nodes {
nodesIdleRes.Add(node.Allocatable.Clone().Multi(1.2).Sub(node.Used))
nodesIdleRes.Add(node.Allocatable.Clone().Multi(enqueue.getOverCommitFactor(ssn)).Sub(node.Used))
}

for {
Expand Down Expand Up @@ -122,3 +134,13 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) {
}

func (enqueue *enqueueAction) UnInitialize() {}

func (enqueue *enqueueAction) getOverCommitFactor(ssn *framework.Session) float64 {
factor := defaultOverCommitFactor
arg := framework.GetArgOfActionFromConf(ssn.Configurations, enqueue.Name())
if arg != nil {
arg.GetFloat64(&factor, overCommitFactor)
}

return factor
}
53 changes: 53 additions & 0 deletions pkg/scheduler/actions/enqueue/enqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package enqueue

import (
"testing"

"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
)

func TestGetOverCommitFactor(t *testing.T) {
cases := []struct {
name string
ssn *framework.Session
expectedValue float64
}{
{
name: "arguments of action not exist",
ssn: &framework.Session{
Configurations: []conf.Configuration{
{
Name: "allocate",
Arguments: map[string]string{
"placeholder": "placeholder",
},
},
},
},
expectedValue: 1.2,
},
{
name: "arguments of action exist",
ssn: &framework.Session{
Configurations: []conf.Configuration{
{
Name: "enqueue",
Arguments: map[string]string{
overCommitFactor: "2",
},
},
},
},
expectedValue: 2,
},
}

enqueue := New()
for index, c := range cases {
factor := enqueue.getOverCommitFactor(c.ssn)
if factor != c.expectedValue {
t.Errorf("index %d, case %s, expected %v, but got %v", index, c.name, c.expectedValue, factor)
}
}
}
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestPreempt(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

preempt.Execute(ssn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestReclaim(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

reclaim.Execute(ssn)
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,23 @@ type SchedulerConfiguration struct {
Actions string `yaml:"actions"`
// Tiers defines plugins in different tiers
Tiers []Tier `yaml:"tiers"`
// Configurations is configuration for actions
Configurations []Configuration `yaml:"configurations"`
}

// Tier defines plugin tier
type Tier struct {
Plugins []PluginOption `yaml:"plugins"`
}

// Configuration is configuration of action
type Configuration struct {
// Name is name of action
Name string `yaml:"name"`
// Arguments defines the different arguments that can be given to specified action
Arguments map[string]string `yaml:"arguments"`
}

// PluginOption defines the options of plugin
type PluginOption struct {
// The name of Plugin
Expand Down
37 changes: 35 additions & 2 deletions pkg/scheduler/framework/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package framework
import (
"strconv"

"volcano.sh/volcano/pkg/scheduler/conf"

"k8s.io/klog"
)

// Arguments map
type Arguments map[string]string

//GetInt get the integer value from string
// GetInt get the integer value from string
func (a Arguments) GetInt(ptr *int, key string) {
if ptr == nil {
return
Expand All @@ -45,7 +47,27 @@ func (a Arguments) GetInt(ptr *int, key string) {
*ptr = value
}

//GetBool get the bool value from string
// GetFloat64 get the float64 value from string
func (a Arguments) GetFloat64(ptr *float64, key string) {
if ptr == nil {
return
}

argv, ok := a[key]
if !ok || len(argv) == 0 {
return
}

value, err := strconv.ParseFloat(argv, 64)
if err != nil {
klog.Warningf("Could not parse argument: %s for key %s, with err %v", argv, key, err)
return
}

*ptr = value
}

// GetBool get the bool value from string
func (a Arguments) GetBool(ptr *bool, key string) {
if ptr == nil {
return
Expand All @@ -64,3 +86,14 @@ func (a Arguments) GetBool(ptr *bool, key string) {

*ptr = value
}

// GetArgOfActionFromConf return argument of action reading from configuration of schedule
func GetArgOfActionFromConf(configurations []conf.Configuration, actionName string) Arguments {
for _, c := range configurations {
if c.Name == actionName {
return c.Arguments
}
}

return nil
}
111 changes: 111 additions & 0 deletions pkg/scheduler/framework/arguments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package framework

import (
"reflect"
"testing"

"volcano.sh/volcano/pkg/scheduler/conf"
)

type GetIntTestCases struct {
Expand Down Expand Up @@ -74,3 +77,111 @@ func TestArgumentsGetInt(t *testing.T) {
}
}
}

func TestArgumentsGetFloat64(t *testing.T) {
key1 := "float64key"

cases := []struct {
name string
arg Arguments
key string
baseValue float64
expectValue float64
}{
{
name: "key not exist",
arg: Arguments{
"anotherKey": "12",
},
key: key1,
baseValue: 1.2,
expectValue: 1.2,
},
{
name: "key exist",
arg: Arguments{
key1: "1.5",
},
key: key1,
baseValue: 1.2,
expectValue: 1.5,
},
{
name: "value of key invalid",
arg: Arguments{
key1: "errorValue",
},
key: key1,
baseValue: 1.2,
expectValue: 1.2,
},
{
name: "value of key null",
arg: Arguments{
key1: "",
},
key: key1,
baseValue: 1.2,
expectValue: 1.2,
},
}

for index, c := range cases {
baseValue := c.baseValue
c.arg.GetFloat64(&baseValue, c.key)
if baseValue != c.expectValue {
t.Errorf("index %d, case %s, value should be %v, but not %v", index, c.name, c.expectValue, baseValue)
}
}
}

func TestGetArgOfActionFromConf(t *testing.T) {
cases := []struct {
name string
configurations []conf.Configuration
action string
expectedArguments Arguments
}{
{
name: "action exist in configurations",
configurations: []conf.Configuration{
{
Name: "enqueue",
Arguments: map[string]string{
"overCommitFactor": "1.5",
},
},
{
Name: "allocate",
Arguments: map[string]string{
"placeholde": "placeholde",
},
},
},
action: "enqueue",
expectedArguments: map[string]string{
"overCommitFactor": "1.5",
},
},
{
name: "action not exist in configurations",
configurations: []conf.Configuration{
{
Name: "enqueue",
Arguments: map[string]string{
"overCommitFactor": "1.5",
},
},
},
action: "allocate",
expectedArguments: nil,
},
}

for index, c := range cases {
arg := GetArgOfActionFromConf(c.configurations, c.action)
if false == reflect.DeepEqual(arg, c.expectedArguments) {
t.Errorf("index %d, case %s,expected %v, but got %v", index, c.name, c.expectedArguments, arg)
}
}
}
3 changes: 2 additions & 1 deletion pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
ssn := openSession(cache)
ssn.Tiers = tiers
ssn.Configurations = configurations

for _, tier := range tiers {
for _, plugin := range tier.Plugins {
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type Session struct {
Queues map[api.QueueID]*api.QueueInfo
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo

Backlog []*api.JobInfo
Tiers []conf.Tier
Backlog []*api.JobInfo
Tiers []conf.Tier
Configurations []conf.Configuration

plugins map[string]Plugin
eventHandlers []*EventHandler
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/binpack/binpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func TestNode(t *testing.T) {
},
},
},
})
}, nil)
defer framework.CloseSession(ssn)

for _, job := range ssn.Jobs {
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Scheduler struct {
config *rest.Config
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
schedulerConf string
schedulePeriod time.Duration
}
Expand Down Expand Up @@ -75,7 +76,7 @@ func (pc *Scheduler) runOnce() {

pc.loadSchedulerConf()

ssn := framework.OpenSession(pc.cache, pc.plugins)
ssn := framework.OpenSession(pc.cache, pc.plugins, pc.configurations)
defer framework.CloseSession(ssn)

for _, action := range pc.actions {
Expand All @@ -98,7 +99,7 @@ func (pc *Scheduler) loadSchedulerConf() {
}
}

pc.actions, pc.plugins, err = loadSchedulerConf(schedConf)
pc.actions, pc.plugins, pc.configurations, err = loadSchedulerConf(schedConf)
if err != nil {
panic(err)
}
Expand Down
Loading