diff --git a/docs/design/drf.md b/docs/design/drf.md index f63aae0f43..c7c7b4d949 100644 --- a/docs/design/drf.md +++ b/docs/design/drf.md @@ -20,7 +20,7 @@ This share value is used for job ordering and task premption. ##### 1.1 Gang Scheduling with DRF in job ordering ( Gang -> DRF) - Gang scheduling sorts the job based on whether the job has atleast **minAvailable** task already (allocated + successfully completed + pipelined) or not. + Gang scheduling sorts the job based on whether the job has at least **minAvailable** task already (allocated + successfully completed + pipelined) or not. Jobs which has not met the minAvailable criteria has higher priority than jobs which has met the minAvailable criteria. diff --git a/pkg/scheduler/actions/shuffle/shuffle.go b/pkg/scheduler/actions/shuffle/shuffle.go new file mode 100644 index 0000000000..1bbfb5ba83 --- /dev/null +++ b/pkg/scheduler/actions/shuffle/shuffle.go @@ -0,0 +1,76 @@ +/* + Copyright 2022 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shuffle + +import ( + "k8s.io/klog" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" +) + +const ( + Strategies = "strategies" + Shuffle = "shuffle" +) + +// Action defines the action +type Action struct{} + +// New returns the action instance +func New() *Action { + return &Action{} +} + +// Name returns the action name +func (alloc *Action) Name() string { + return Shuffle +} + +// Initialize inits the action +func (alloc *Action) Initialize() {} + +// Execute select evictees according given strategies and evict them. +func (alloc *Action) Execute(ssn *framework.Session) { + klog.V(3).Infof("Enter Shuffle ...\n") + defer klog.V(3).Infof("Leaving Shuffle ...\n") + + // select pods that may be evicted + tasks := make([]*api.TaskInfo, 0) + for _, jobInfo := range ssn.Jobs { + for _, taskInfo := range jobInfo.Tasks { + if taskInfo.Status == api.Running { + tasks = append(tasks, taskInfo) + } + } + } + for _, task := range tasks { + klog.V(4).Infof("Running tasks %s: [ns: %s, job: %s]\n", task.Name, task.Namespace, task.Job) + } + + // Evict target workloads + victims := ssn.Victims(tasks) + for victim := range victims { + klog.V(5).Infof("Victim %s: [ns: %s, job: %s]\n", victim.Name, victim.Namespace, victim.Job) + if err := ssn.Evict(victim, "shuffle"); err != nil { + klog.Errorf("Failed to evict Task <%s/%s>: %v", victim.Namespace, victim.Name, err) + continue + } + } +} + +// UnInitialize releases resource which are not useful. +func (alloc *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/shuffle/shuffle_test.go b/pkg/scheduler/actions/shuffle/shuffle_test.go new file mode 100644 index 0000000000..d3dd99be27 --- /dev/null +++ b/pkg/scheduler/actions/shuffle/shuffle_test.go @@ -0,0 +1,331 @@ +/* + Copyright 2022 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shuffle + +import ( + "testing" + "time" + + v1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + + schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/cache" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/plugins/rescheduling" + "volcano.sh/volcano/pkg/scheduler/util" +) + +func TestShuffle(t *testing.T) { + framework.RegisterPluginBuilder("rescheduling", rescheduling.New) + defer framework.CleanupPluginBuilders() + util.FnsLastExecTime["reschedulingFns"] = time.Now() + defer delete(util.FnsLastExecTime, "reschedulingFns") + + var highPriority int32 + var lowPriority int32 + highPriority = 100000 + lowPriority = 10 + tests := []struct { + name string + podGroups []*schedulingv1beta1.PodGroup + pods []*v1.Pod + nodes []*v1.Node + queues []*schedulingv1beta1.Queue + interval string + expected int + }{ + { + name: "scheduled task test for rescheduling cycle", + nodes: []*v1.Node{ + util.BuildNode("node1", util.BuildResourceList("4", "8Gi"), make(map[string]string)), + util.BuildNode("node2", util.BuildResourceList("4", "8Gi"), make(map[string]string)), + }, + queues: []*schedulingv1beta1.Queue{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + Spec: schedulingv1beta1.QueueSpec{ + Weight: 1, + }, + }, + }, + podGroups: []*schedulingv1beta1.PodGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg2", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg3", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg4", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + }, + pods: []*v1.Pod{ + util.BuildPod("test", "pod1", "node1", v1.PodRunning, util.BuildResourceList("1", "2G"), "pg1", make(map[string]string), make(map[string]string)), + util.BuildPod("test", "pod2", "node1", v1.PodRunning, util.BuildResourceList("1", "2G"), "pg2", make(map[string]string), make(map[string]string)), + util.BuildPod("test", "pod3", "node1", v1.PodRunning, util.BuildResourceList("1", "2G"), "pg3", make(map[string]string), make(map[string]string)), + util.BuildPod("test", "pod4", "node2", v1.PodRunning, util.BuildResourceList("1", "2G"), "pg4", make(map[string]string), make(map[string]string)), + }, + interval: "5m", + expected: 0, + }, + { + name: "rescheduling works with lowNodeUtilization strategy on condition that 3 nodes and 6 different pods", + nodes: []*v1.Node{ + util.BuildNode("node1", util.BuildResourceList("4", "8Gi"), make(map[string]string)), + util.BuildNode("node2", util.BuildResourceList("4", "8Gi"), make(map[string]string)), + util.BuildNode("node3", util.BuildResourceList("4", "8Gi"), make(map[string]string)), + }, + queues: []*schedulingv1beta1.Queue{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + Spec: schedulingv1beta1.QueueSpec{ + Weight: 1, + }, + }, + }, + podGroups: []*schedulingv1beta1.PodGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + PriorityClassName: "high-priority", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg2", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + PriorityClassName: "high-priority", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg3", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + PriorityClassName: "high-priority", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg4", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + PriorityClassName: "high-priority", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg5", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + PriorityClassName: "low-priority", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg6", + Namespace: "test", + }, + Spec: schedulingv1beta1.PodGroupSpec{ + Queue: "default", + PriorityClassName: "high-priority", + }, + Status: schedulingv1beta1.PodGroupStatus{ + Phase: schedulingv1beta1.PodGroupRunning, + }, + }, + }, + pods: []*v1.Pod{ + util.BuildPod("test", "pod1", "node1", v1.PodRunning, util.BuildResourceList("1.5", "3G"), "pg1", make(map[string]string), make(map[string]string)), + util.BuildBestEffortPod("test", "pod2", "node1", v1.PodRunning, "pg2", make(map[string]string), make(map[string]string)), + util.BuildPod("test", "pod3", "node1", v1.PodRunning, util.BuildResourceList("2", "4G"), "pg3", make(map[string]string), make(map[string]string)), + util.BuildPodWithPriority("test", "pod4", "node2", v1.PodRunning, util.BuildResourceList("1", "2G"), "pg4", make(map[string]string), make(map[string]string), &highPriority), + util.BuildPodWithPriority("test", "pod5", "node2", v1.PodRunning, util.BuildResourceList("0.5", "1G"), "pg5", make(map[string]string), make(map[string]string), &lowPriority), + util.BuildPodWithPriority("test", "pod6", "node2", v1.PodRunning, util.BuildResourceList("1.5", "3G"), "pg6", make(map[string]string), make(map[string]string), &highPriority), + }, + interval: "1s", + expected: 3, + }, + } + shuffle := New() + + for i, test := range tests { + binder := &util.FakeBinder{ + Binds: map[string]string{}, + Channel: make(chan string, 1), + } + evictor := &util.FakeEvictor{ + Channel: make(chan string), + } + schedulerCache := &cache.SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + Queues: make(map[api.QueueID]*api.QueueInfo), + Binder: binder, + Evictor: evictor, + StatusUpdater: &util.FakeStatusUpdater{}, + VolumeBinder: &util.FakeVolumeBinder{}, + PriorityClasses: make(map[string]*schedulingv1.PriorityClass), + + Recorder: record.NewFakeRecorder(100), + } + schedulerCache.PriorityClasses["high-priority"] = &schedulingv1.PriorityClass{ + Value: highPriority, + } + schedulerCache.PriorityClasses["low-priority"] = &schedulingv1.PriorityClass{ + Value: lowPriority, + } + + for _, node := range test.nodes { + schedulerCache.AddNode(node) + } + for _, q := range test.queues { + schedulerCache.AddQueueV1beta1(q) + } + for _, ss := range test.podGroups { + schedulerCache.AddPodGroupV1beta1(ss) + } + for _, pod := range test.pods { + schedulerCache.AddPod(pod) + } + + time.Sleep(1 * time.Second) + trueValue := true + ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: "rescheduling", + EnabledVictim: &trueValue, + Arguments: map[string]interface{}{ + "interval": test.interval, + "strategies": []rescheduling.Strategy{ + { + Name: "lowNodeUtilization", + Parameters: map[string]interface{}{ + "thresholds": map[string]int{ + "cpu": 30, + "memory": 30, + "pods": 30, + }, + "targetThresholds": map[string]int{ + "cpu": 70, + "memory": 70, + "pods": 70, + }, + }, + }, + }, + }, + }, + }, + }, + }, nil) + defer framework.CloseSession(ssn) + + shuffle.Execute(ssn) + + for { + select { + case <-evictor.Channel: + case <-time.After(2 * time.Second): + goto LOOP + } + } + + LOOP: + if test.expected != len(evictor.Evicts()) { + t.Errorf("case %d (%s): expected: %v, got %v ", i, test.name, test.expected, len(evictor.Evicts())) + } + } +} diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index c13f5480de..a2b93554a2 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -34,6 +34,11 @@ func (o *AllocateFailError) Error() string { return o.Reason } +type NodeUsage struct { + CPUUsageAvg map[string]float64 + MemUsageAvg map[string]float64 +} + // NodeInfo is node level aggregated information. type NodeInfo struct { Name string @@ -52,8 +57,9 @@ type NodeInfo struct { // pods Used *Resource - Allocatable *Resource - Capability *Resource + Allocatable *Resource + Capability *Resource + ResourceUsage NodeUsage Tasks map[TaskID]*TaskInfo NumaInfo *NumatopoInfo @@ -100,8 +106,9 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { Idle: EmptyResource(), Used: EmptyResource(), - Allocatable: EmptyResource(), - Capability: EmptyResource(), + Allocatable: EmptyResource(), + Capability: EmptyResource(), + ResourceUsage: NodeUsage{}, OversubscriptionResource: EmptyResource(), Tasks: make(map[TaskID]*TaskInfo), diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 8bcaa3e6cc..6f4986fcbe 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -171,3 +171,9 @@ type VictimTasksFn func() []*TaskInfo // AllocatableFn is the func declaration used to check whether the task can be allocated type AllocatableFn func(*QueueInfo, *TaskInfo) bool + +// VictimTasksFromCandidatesFn is the func declaration used to select victim tasks from candidates and evict them +type VictimTasksFromCandidatesFn func([]*TaskInfo) []*TaskInfo + +// UnderUsedResourceFn is the func declaration used to get under used resource list for queue +type UnderUsedResourceFn func(*QueueInfo) ResourceNameList diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index 76c7f91654..dafe18e58f 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -79,6 +79,7 @@ type PluginOption struct { EnabledVictim *bool `yaml:"enabledVictim"` // EnabledJobStarving defines whether jobStarvingFn is enabled EnabledJobStarving *bool `yaml:"enableJobStarving"` + // Arguments defines the different arguments that can be given to different plugins Arguments map[string]interface{} `yaml:"arguments"` } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index f16055e4d8..d128331833 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -83,6 +83,7 @@ type Session struct { reservedNodesFns map[string]api.ReservedNodesFn victimTasksFns map[string]api.VictimTasksFn jobStarvingFns map[string]api.ValidateFn + reschedulingFns map[string][]api.VictimTasksFromCandidatesFn } func openSession(cache cache.Cache) *Session { @@ -125,6 +126,7 @@ func openSession(cache cache.Cache) *Session { reservedNodesFns: map[string]api.ReservedNodesFn{}, victimTasksFns: map[string]api.VictimTasksFn{}, jobStarvingFns: map[string]api.ValidateFn{}, + reschedulingFns: map[string][]api.VictimTasksFromCandidatesFn{}, } snapshot := cache.Snapshot() @@ -184,6 +186,7 @@ func closeSession(ssn *Session) { ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil ssn.clusterOrderFns = nil + ssn.reschedulingFns = nil ssn.NodeList = nil ssn.TotalResource = nil diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index ae71633a56..f23132329b 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -18,7 +18,6 @@ package framework import ( k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" - "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/volcano/pkg/controllers/job/helpers" "volcano.sh/volcano/pkg/scheduler/api" @@ -139,6 +138,11 @@ func (ssn *Session) AddVictimTasksFns(name string, fn api.VictimTasksFn) { ssn.victimTasksFns[name] = fn } +// AddVictimTasksFromCandidatesFns add VictimTasksFromCandidatesFns function +func (ssn *Session) AddVictimTasksFromCandidatesFns(name string, fns []api.VictimTasksFromCandidatesFn) { + ssn.reschedulingFns[name] = fns +} + // AddJobStarvingFns add jobStarvingFns function func (ssn *Session) AddJobStarvingFns(name string, fn api.ValidateFn) { ssn.jobStarvingFns[name] = fn @@ -503,6 +507,30 @@ func (ssn *Session) ReservedNodes() { } } +// Victims returns the victims for rescheduling +func (ssn *Session) Victims(tasks []*api.TaskInfo) map[*api.TaskInfo]bool { + // different filters may add the same task to victims, so use a map to remove duplicate tasks. + victimSet := make(map[*api.TaskInfo]bool) + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if !isEnabled(plugin.EnabledVictim) { + continue + } + fns, found := ssn.reschedulingFns[plugin.Name] + if !found { + continue + } + for _, fn := range fns { + victimTasks := fn(tasks) + for _, victim := range victimTasks { + victimSet[victim] = true + } + } + } + } + return victimSet +} + // JobOrderFn invoke joborder function of the plugins func (ssn *Session) JobOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { diff --git a/pkg/scheduler/plugins/factory.go b/pkg/scheduler/plugins/factory.go index 9a04ae9d44..8caebf69de 100644 --- a/pkg/scheduler/plugins/factory.go +++ b/pkg/scheduler/plugins/factory.go @@ -29,6 +29,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/plugins/predicates" "volcano.sh/volcano/pkg/scheduler/plugins/priority" "volcano.sh/volcano/pkg/scheduler/plugins/proportion" + "volcano.sh/volcano/pkg/scheduler/plugins/rescheduling" "volcano.sh/volcano/pkg/scheduler/plugins/reservation" "volcano.sh/volcano/pkg/scheduler/plugins/sla" tasktopology "volcano.sh/volcano/pkg/scheduler/plugins/task-topology" @@ -50,6 +51,7 @@ func init() { framework.RegisterPluginBuilder(sla.PluginName, sla.New) framework.RegisterPluginBuilder(tasktopology.PluginName, tasktopology.New) framework.RegisterPluginBuilder(numaaware.PluginName, numaaware.New) + framework.RegisterPluginBuilder(rescheduling.PluginName, rescheduling.New) // Plugins for Queues framework.RegisterPluginBuilder(proportion.PluginName, proportion.New) diff --git a/pkg/scheduler/plugins/rescheduling/low_node_utilization.go b/pkg/scheduler/plugins/rescheduling/low_node_utilization.go new file mode 100644 index 0000000000..a714ffc455 --- /dev/null +++ b/pkg/scheduler/plugins/rescheduling/low_node_utilization.go @@ -0,0 +1,211 @@ +/* +Copyright 2022 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rescheduling + +import ( + "reflect" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog" + + "volcano.sh/volcano/pkg/scheduler/api" +) + +// DefaultLowNodeConf defines the default configuration for LNU strategy +var DefaultLowNodeConf = map[string]interface{}{ + "thresholds": map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + "targetThresholds": map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + "thresholdPriorityClassName": "system-cluster-critical", + "nodeFit": true, +} + +type LowNodeUtilizationConf struct { + Thresholds map[string]float64 + TargetThresholds map[string]float64 + NumberOfNodes int + ThresholdPriority int + ThresholdPriorityClassName string + NodeFit bool +} + +// NewLowNodeUtilizationConf returns the pointer of LowNodeUtilizationConf object with default value +func NewLowNodeUtilizationConf() *LowNodeUtilizationConf { + return &LowNodeUtilizationConf{ + Thresholds: map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + TargetThresholds: map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + ThresholdPriorityClassName: "system-cluster-critical", + NodeFit: true, + } +} + +// parse converts the config map to struct object +func (lnuc *LowNodeUtilizationConf) parse(configs map[string]interface{}) { + if len(configs) == 0 { + return + } + lowThresholdsConfigs, ok := configs["thresholds"] + if ok { + lowConfigs, _ := lowThresholdsConfigs.(map[string]int) + parseThreshold(lowConfigs, lnuc, "Thresholds") + } + targetThresholdsConfigs, ok := configs["targetThresholds"] + if ok { + targetConfigs, _ := targetThresholdsConfigs.(map[string]int) + parseThreshold(targetConfigs, lnuc, "TargetThresholds") + } +} + +func parseThreshold(thresholdsConfig map[string]int, lnuc *LowNodeUtilizationConf, param string) { + if len(thresholdsConfig) > 0 { + configValue := reflect.ValueOf(lnuc).Elem().FieldByName(param) + config := configValue.Interface().(map[string]float64) + + cpuThreshold, ok := thresholdsConfig["cpu"] + if ok { + config["cpu"] = float64(cpuThreshold) + } + memoryThreshold, ok := thresholdsConfig["memory"] + if ok { + config["memory"] = float64(memoryThreshold) + } + podThreshold, ok := thresholdsConfig["pod"] + if ok { + config["pod"] = float64(podThreshold) + } + } +} + +var victimsFnForLnu = func(tasks []*api.TaskInfo) []*api.TaskInfo { + victims := make([]*api.TaskInfo, 0) + + // parse configuration arguments + utilizationConfig := NewLowNodeUtilizationConf() + parametersConfig := RegisteredStrategyConfigs["lowNodeUtilization"] + var config map[string]interface{} + config, ok := parametersConfig.(map[string]interface{}) + if !ok { + klog.Error("parameters parse error for lowNodeUtilization") + return victims + } + utilizationConfig.parse(config) + klog.V(4).Infof("The configuration for lowNodeUtilization: %v", *utilizationConfig) + + // group the nodes into lowNodes and highNodes + nodeUtilizationList := getNodeUtilization() + klog.V(4).Infoln("The nodeUtilizationList:") + for _, nodeUtilization := range nodeUtilizationList { + klog.V(4).Infof("node: %s, utilization: %s \n", nodeUtilization.nodeInfo.Name, nodeUtilization.utilization) + for _, pod := range nodeUtilization.pods { + klog.V(4).Infof("pod: %s \n", pod.Name) + } + } + + lowNodes, highNodes := groupNodesByUtilization(nodeUtilizationList, lowThresholdFilter, highThresholdFilter, *utilizationConfig) + klog.V(4).Infoln("The low nodes:") + for _, node := range lowNodes { + klog.V(4).Infoln(node.nodeInfo.Name) + } + klog.V(4).Infoln("The high nodes:") + for _, node := range highNodes { + klog.V(4).Infoln(node.nodeInfo.Name) + } + if len(lowNodes) == 0 { + klog.V(4).Infof("The resource utilization of all nodes is above the threshold") + return victims + } + if len(lowNodes) == len(Session.Nodes) { + klog.V(4).Infof("The resource utilization of all nodes is below the threshold") + return victims + } + if len(highNodes) == 0 { + klog.V(4).Infof("The resource utilization of all nodes is below the target threshold") + return victims + } + + // select victims from lowNodes + return evictPodsFromSourceNodes(highNodes, lowNodes, tasks, isContinueEvictPods, *utilizationConfig) +} + +// lowThresholdFilter filter nodes which all resource dimensions are under the low utilization threshold +func lowThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface{}) bool { + utilizationConfig := parseArgToConfig(config) + if utilizationConfig == nil { + klog.V(4).Infof("lack of LowNodeUtilizationConf pointer parameter") + return false + } + + if node.Spec.Unschedulable { + return false + } + nodeCapacity := getNodeCapacity(node) + for rName, usage := range usage.utilization { + if thresholdPercent, ok := utilizationConfig.Thresholds[string(rName)]; ok { + threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity) + if usage.Cmp(*threshold) == 1 { + return false + } + } + } + return true +} + +// highThresholdFilter filter nodes which at least one resource dimension above the target utilization threshold +func highThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface{}) bool { + utilizationConfig := parseArgToConfig(config) + if utilizationConfig == nil { + klog.V(4).Infof("lack of LowNodeUtilizationConf pointer parameter") + return false + } + + nodeCapacity := getNodeCapacity(node) + for rName, usage := range usage.utilization { + if thresholdPercent, ok := utilizationConfig.TargetThresholds[string(rName)]; ok { + threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity) + if usage.Cmp(*threshold) == 1 { + return true + } + } + } + return false +} + +// isContinueEvictPods judges whether continue to select victim pods +func isContinueEvictPods(node *v1.Node, usage *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, config interface{}) bool { + var isNodeOverused bool + utilizationConfig := parseArgToConfig(config) + nodeCapacity := getNodeCapacity(node) + for rName, usage := range usage.utilization { + if thresholdPercent, ok := utilizationConfig.TargetThresholds[string(rName)]; ok { + threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity) + if usage.Cmp(*threshold) == 1 { + isNodeOverused = true + break + } + } + } + if !isNodeOverused { + return false + } + + for rName := range totalAllocatableResource { + if totalAllocatableResource[rName].CmpInt64(0) == 0 { + return false + } + } + return true +} diff --git a/pkg/scheduler/plugins/rescheduling/node_utilization_util.go b/pkg/scheduler/plugins/rescheduling/node_utilization_util.go new file mode 100644 index 0000000000..e8e0b9d202 --- /dev/null +++ b/pkg/scheduler/plugins/rescheduling/node_utilization_util.go @@ -0,0 +1,205 @@ +/* +Copyright 2022 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rescheduling + +import ( + "sort" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog" + v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" + + "volcano.sh/volcano/pkg/scheduler/api" +) + +const FiveMinutes = "5m" + +type NodeUtilization struct { + nodeInfo *v1.Node + utilization map[v1.ResourceName]*resource.Quantity + pods []*v1.Pod +} + +type thresholdFilter func(*v1.Node, *NodeUtilization, interface{}) bool + +type isContinueEviction func(node *v1.Node, usage *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, config interface{}) bool + +// groupNodesByUtilization divides the nodes into two groups by resource utilization filters +func groupNodesByUtilization(nodeUtilizationList []*NodeUtilization, lowThresholdFilter, highThresholdFilter thresholdFilter, config interface{}) ([]*NodeUtilization, []*NodeUtilization) { + lowNodes := make([]*NodeUtilization, 0) + highNodes := make([]*NodeUtilization, 0) + + for _, nodeUtilization := range nodeUtilizationList { + if lowThresholdFilter(nodeUtilization.nodeInfo, nodeUtilization, config) { + lowNodes = append(lowNodes, nodeUtilization) + } else if highThresholdFilter(nodeUtilization.nodeInfo, nodeUtilization, config) { + highNodes = append(highNodes, nodeUtilization) + } + } + + return lowNodes, highNodes +} + +// getNodeUtilization returns all node resource utilization list +func getNodeUtilization() []*NodeUtilization { + nodeUtilizationList := make([]*NodeUtilization, 0) + for _, nodeInfo := range Session.Nodes { + nodeUtilization := &NodeUtilization{ + nodeInfo: nodeInfo.Node, + utilization: map[v1.ResourceName]*resource.Quantity{}, + pods: nodeInfo.Pods(), + } + nodeUtilization.utilization[v1.ResourceCPU] = resource.NewMilliQuantity(int64(nodeInfo.ResourceUsage.CPUUsageAvg[FiveMinutes]), resource.DecimalSI) + nodeUtilization.utilization[v1.ResourceMemory] = resource.NewQuantity(int64(nodeInfo.ResourceUsage.MemUsageAvg[FiveMinutes]), resource.BinarySI) + nodeUtilizationList = append(nodeUtilizationList, nodeUtilization) + } + return nodeUtilizationList +} + +// evictPodsFromSourceNodes evict pods from source nodes to target nodes according to priority and QoS +func evictPodsFromSourceNodes(sourceNodes, targetNodes []*NodeUtilization, tasks []*api.TaskInfo, evictionCon isContinueEviction, config interface{}) []*api.TaskInfo { + resourceNames := []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } + utilizationConfig := parseArgToConfig(config) + totalAllocatableResource := map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: {}, + v1.ResourceMemory: {}, + } + for _, node := range targetNodes { + nodeCapacity := getNodeCapacity(node.nodeInfo) + for _, rName := range resourceNames { + totalAllocatableResource[rName].Add(*getThresholdForNode(rName, utilizationConfig.TargetThresholds[string(rName)], nodeCapacity)) + totalAllocatableResource[rName].Sub(*node.utilization[rName]) + } + } + klog.V(4).Infof("totalAllocatableResource: %s", totalAllocatableResource) + + // sort the source nodes in descending order + sortNodes(sourceNodes, Session.Nodes) + klog.V(4).Infoln("sourceNodes:") + for _, node := range sourceNodes { + klog.V(4).Infoln(node.nodeInfo.Name) + } + + // victims select algorithm: + // 1. Evict pods from nodes with high utilization to low utilization + // 2. As to one node, evict pods from low priority to high priority. If the priority is same, evict pods according to QoS from low to high + victims := make([]*api.TaskInfo, 0) + for _, node := range sourceNodes { + if len(node.pods) == 0 { + klog.V(4).Infof("No pods can be removed on node: %s", node.nodeInfo.Name) + continue + } + sortPods(node.pods) + victims = append(victims, evict(node.pods, node, totalAllocatableResource, evictionCon, tasks, config)...) + } + return victims +} + +// parseArgToConfig returns a nodeUtilizationConfig object from parameters +// TODO: It is just for lowNodeUtilization now, which should be abstracted as a common function. +func parseArgToConfig(config interface{}) *LowNodeUtilizationConf { + var utilizationConfig *LowNodeUtilizationConf + if arg, ok := config.(LowNodeUtilizationConf); ok { + utilizationConfig = &arg + } + + return utilizationConfig +} + +// sortNodes sorts all the nodes according the usage of cpu and memory with weight score +func sortNodes(nodeUtilizationList []*NodeUtilization, nodes map[string]*api.NodeInfo) { + cmpFn := func(i, j int) bool { + return getScoreForNode(i, nodeUtilizationList, nodes) > getScoreForNode(j, nodeUtilizationList, nodes) + } + sort.Slice(nodeUtilizationList, cmpFn) +} + +// getScoreForNode returns the score for node which considers only for CPU and memory +func getScoreForNode(index int, nodeUtilizationList []*NodeUtilization, nodes map[string]*api.NodeInfo) float64 { + nodeName := nodeUtilizationList[index].nodeInfo.Name + cpuScore := float64(nodeUtilizationList[index].utilization[v1.ResourceCPU].MilliValue()) / nodes[nodeName].Capability.MilliCPU + memoryScore := float64(nodeUtilizationList[index].utilization[v1.ResourceMemory].MilliValue()) / nodes[nodeName].Capability.Memory + return cpuScore + memoryScore +} + +// getThresholdForNode returns resource threshold on some dimension +func getThresholdForNode(rName v1.ResourceName, thresholdPercent float64, nodeCapacity v1.ResourceList) *resource.Quantity { + var threshold *resource.Quantity + if rName == v1.ResourceCPU { + threshold = resource.NewMilliQuantity(int64(thresholdPercent*float64(nodeCapacity.Cpu().MilliValue())*0.01), resource.DecimalSI) + } else if rName == v1.ResourceMemory { + threshold = resource.NewQuantity(int64(thresholdPercent*float64(nodeCapacity.Memory().Value())*0.01), resource.BinarySI) + } + return threshold +} + +// getNodeCapacity returns node's capacity +func getNodeCapacity(node *v1.Node) v1.ResourceList { + nodeCapacity := node.Status.Capacity + if len(node.Status.Allocatable) > 0 { + nodeCapacity = node.Status.Allocatable + } + return nodeCapacity +} + +// sortPods return the pods in order according the priority and QoS +func sortPods(pods []*v1.Pod) { + cmp := func(i, j int) bool { + if pods[i].Spec.Priority == nil && pods[j].Spec.Priority != nil { + return true + } + if pods[j].Spec.Priority == nil && pods[i].Spec.Priority != nil { + return false + } + if (pods[j].Spec.Priority == nil && pods[i].Spec.Priority == nil) || (*pods[i].Spec.Priority == *pods[j].Spec.Priority) { + if v1qos.GetPodQOS(pods[i]) == v1.PodQOSBestEffort { + return true + } + if v1qos.GetPodQOS(pods[i]) == v1.PodQOSBurstable && v1qos.GetPodQOS(pods[j]) == v1.PodQOSGuaranteed { + return true + } + return false + } + return *pods[i].Spec.Priority < *pods[j].Spec.Priority + } + sort.Slice(pods, cmp) +} + +// evict select victims and add to the eviction list +func evict(pods []*v1.Pod, utilization *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, continueEviction isContinueEviction, tasks []*api.TaskInfo, config interface{}) []*api.TaskInfo { + victims := make([]*api.TaskInfo, 0) + for _, pod := range pods { + if !continueEviction(utilization.nodeInfo, utilization, totalAllocatableResource, config) { + return victims + } + for _, task := range tasks { + if task.Pod.UID == pod.UID { + totalAllocatableResource[v1.ResourceCPU].Sub(*resource.NewMilliQuantity(int64(task.Resreq.MilliCPU), resource.DecimalSI)) + totalAllocatableResource[v1.ResourceMemory].Sub(*resource.NewQuantity(int64(task.Resreq.Memory), resource.BinarySI)) + utilization.utilization[v1.ResourceCPU].Sub(*resource.NewMilliQuantity(int64(task.Resreq.MilliCPU), resource.DecimalSI)) + utilization.utilization[v1.ResourceMemory].Sub(*resource.NewQuantity(int64(task.Resreq.Memory), resource.BinarySI)) + victims = append(victims, task) + break + } + } + } + return victims +} diff --git a/pkg/scheduler/plugins/rescheduling/rescheduling.go b/pkg/scheduler/plugins/rescheduling/rescheduling.go new file mode 100644 index 0000000000..43e5f946b5 --- /dev/null +++ b/pkg/scheduler/plugins/rescheduling/rescheduling.go @@ -0,0 +1,170 @@ +/* +Copyright 2022 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rescheduling + +import ( + "time" + + "k8s.io/klog" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +const ( + // PluginName indicates name of volcano scheduler plugin + PluginName = "rescheduling" + // DefaultInterval indicates the default interval rescheduling plugin works for + DefaultInterval = 5 * time.Minute + // DefaultStrategy indicates the default strategy rescheduling plugin making use of + DefaultStrategy = "lowNodeUtilization" +) + +var ( + // Session contains all the data in session object which will be used for all the rescheduling package + Session *framework.Session + + // RegisteredStrategies collects all the strategies registered. + RegisteredStrategies []string + + // RegisteredStrategyConfigs collects all the strategy configurations registered. + RegisteredStrategyConfigs map[string]interface{} + + // VictimFns contains all the victimFns for registered the strategies + VictimFns map[string]api.VictimTasksFromCandidatesFn +) + +func init() { + RegisteredStrategies = make([]string, 0) + RegisteredStrategyConfigs = make(map[string]interface{}) + VictimFns = make(map[string]api.VictimTasksFromCandidatesFn) + + // register victim functions for all strategies here + VictimFns["lowNodeUtilization"] = victimsFnForLnu +} + +type reschedulingPlugin struct { + // Arguments given for rescheduling plugin + pluginArguments framework.Arguments +} + +// New function returns rescheduling plugin object +func New(arguments framework.Arguments) framework.Plugin { + return &reschedulingPlugin{ + pluginArguments: arguments, + } +} + +// Name returns the name of rescheduling plugin +func (rp *reschedulingPlugin) Name() string { + return PluginName +} + +func (rp *reschedulingPlugin) OnSessionOpen(ssn *framework.Session) { + klog.V(4).Infof("Enter rescheduling plugin ...") + defer klog.V(4).Infof("Leaving rescheduling plugin.") + + // Parse all the rescheduling strategies and execution interval + Session = ssn + configs := NewReschedulingConfigs() + for _, tier := range ssn.Tiers { + for _, pluginOption := range tier.Plugins { + if pluginOption.Name == PluginName { + configs.parseArguments(pluginOption.Arguments) + break + } + } + } + klog.V(4).Infof("rescheduling config: %v", configs) + + // Judge whether it is time to execute rescheduling now + if !util.IsToBeExecuted("reschedulingFns", configs.interval) { + klog.V(4).Infof("It is not the time to execute rescheduling strategies.") + return + } + + // Get all strategies and register the VictimTasksFromCandidatesFns + victimFns := make([]api.VictimTasksFromCandidatesFn, 0) + for _, strategy := range configs.strategies { + victimFns = append(victimFns, VictimFns[strategy.Name]) + } + ssn.AddVictimTasksFromCandidatesFns(rp.Name(), victimFns) +} + +func (rp *reschedulingPlugin) OnSessionClose(ssn *framework.Session) { + Session = nil + RegisteredStrategies = RegisteredStrategies[0:0] + for k := range RegisteredStrategyConfigs { + delete(RegisteredStrategyConfigs, k) + } + VictimFns = nil +} + +// ReschedulingConfigs is the struct for rescheduling plugin arguments +type ReschedulingConfigs struct { + interval time.Duration + strategies []Strategy +} + +// Strategy is the struct for rescheduling strategy +type Strategy struct { + Name string + Parameters map[string]interface{} +} + +// NewReschedulingConfigs creates an object of rescheduling configurations with default configuration +func NewReschedulingConfigs() *ReschedulingConfigs { + config := &ReschedulingConfigs{ + interval: DefaultInterval, + strategies: []Strategy{ + { + Name: DefaultStrategy, + Parameters: DefaultLowNodeConf, + }, + }, + } + RegisteredStrategies = append(RegisteredStrategies, DefaultStrategy) + RegisteredStrategyConfigs[DefaultStrategy] = DefaultLowNodeConf + return config +} + +// parseArguments parse all the rescheduling arguments +func (rc *ReschedulingConfigs) parseArguments(arguments framework.Arguments) { + var intervalStr string + var err error + if intervalArg, ok := arguments["interval"]; ok { + intervalStr = intervalArg.(string) + } + rc.interval, err = time.ParseDuration(intervalStr) + if err != nil { + klog.V(4).Infof("Parse rescheduling interval failed. Reset the interval to 5m by default.") + rc.interval = DefaultInterval + } + strategies, ok := arguments["strategies"] + if ok { + rc.strategies = strategies.([]Strategy) + RegisteredStrategies = RegisteredStrategies[0:0] + for k := range RegisteredStrategyConfigs { + delete(RegisteredStrategyConfigs, k) + } + for _, strategy := range rc.strategies { + RegisteredStrategies = append(RegisteredStrategies, strategy.Name) + RegisteredStrategyConfigs[strategy.Name] = strategy.Parameters + } + } +} diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 9172469119..0645944ad0 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -23,6 +23,7 @@ import ( "math/rand" "sort" "sync" + "time" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -39,8 +40,23 @@ var lastProcessedNodeIndex int // Reservation is used to record target job and locked nodes var Reservation *ResourceReservation +// FnsLastExecTime records the last execution time of functions registering in the plugins and executing periodically +var FnsLastExecTime map[string]time.Time + func init() { Reservation = NewResourceReservation() + FnsLastExecTime = make(map[string]time.Time) +} + +// IsToBeExecuted checks whether it is time for the function to be executed now +func IsToBeExecuted(fnName string, interval time.Duration) bool { + now := time.Now() + lastExecuteTime, ok := FnsLastExecTime[fnName] + if !ok || lastExecuteTime.Add(interval).Before(now) { + FnsLastExecTime[fnName] = now + return true + } + return false } // CalculateNumOfFeasibleNodesToFind returns the number of feasible nodes that once found, diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index a853c31204..9a0eabc0f4 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -68,8 +68,8 @@ func BuildNode(name string, alloc v1.ResourceList, labels map[string]string) *v1 } } -// BuildPod builts Pod object -func BuildPod(namespace, name, nodename string, p v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string) *v1.Pod { +// BuildPod builds a Burstable pod object +func BuildPod(namespace, name, nodeName string, p v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), @@ -84,7 +84,7 @@ func BuildPod(namespace, name, nodename string, p v1.PodPhase, req v1.ResourceLi Phase: p, }, Spec: v1.PodSpec{ - NodeName: nodename, + NodeName: nodeName, NodeSelector: selector, Containers: []v1.Container{ { @@ -193,6 +193,65 @@ func BuildDynamicPVC(namespace, name string, req v1.ResourceList) (*v1.Persisten return pvc, pv, sc } +// BuildBestEffortPod builds a BestEffort pod object +func BuildBestEffortPod(namespace, name, nodeName string, p v1.PodPhase, groupName string, labels map[string]string, selector map[string]string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + Name: name, + Namespace: namespace, + Labels: labels, + Annotations: map[string]string{ + schedulingv2.KubeGroupNameAnnotationKey: groupName, + }, + }, + Status: v1.PodStatus{ + Phase: p, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + NodeSelector: selector, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{}, + }, + }, + }, + }, + } +} + +// BuildPodWithPriority builds a pod object with priority +func BuildPodWithPriority(namespace, name, nodeName string, p v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string, priority *int32) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + Name: name, + Namespace: namespace, + Labels: labels, + Annotations: map[string]string{ + schedulingv2.KubeGroupNameAnnotationKey: groupName, + }, + }, + Status: v1.PodStatus{ + Phase: p, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + NodeSelector: selector, + Priority: priority, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: req, + }, + }, + }, + }, + } +} + // FakeBinder is used as fake binder type FakeBinder struct { Binds map[string]string