Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
QingyaFan committed Aug 8, 2024
2 parents 4a52ca4 + ab23490 commit b7f6e4a
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 116 deletions.
20 changes: 15 additions & 5 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/component-base/config"
componentbaseconfigvalidation "k8s.io/component-base/config/validation"
Expand Down Expand Up @@ -120,14 +121,23 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) {
"'-' to disable controllers, e.g. \"-job-controller,-queue-controller\" to disable job and queue controllers.", knownControllers))
}

// CheckOptionOrDie checks the option and returns error if it's invalid
// CheckOptionOrDie checks all options and returns all errors if they are invalid.
// If there are any invalid options, it aggregates all the errors and returns them.
func (s *ServerOption) CheckOptionOrDie() error {
// check controllers option
var allErrors []error

// Check controllers option
if err := s.checkControllers(); err != nil {
return err
allErrors = append(allErrors, err)
}

// Check leader election flag when LeaderElection is enabled.
leaderElectionErr := componentbaseconfigvalidation.ValidateLeaderElectionConfiguration(
&s.LeaderElection, field.NewPath("leaderElection")).ToAggregate()
if leaderElectionErr != nil {
allErrors = append(allErrors, leaderElectionErr)
}
// check leader election flag when LeaderElection is enabled.
return componentbaseconfigvalidation.ValidateLeaderElectionConfiguration(&s.LeaderElection, field.NewPath("leaderElection")).ToAggregate()
return errors.NewAggregate(allErrors)
}

// checkControllers checks the controllers option and returns error if it's invalid
Expand Down
36 changes: 36 additions & 0 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,42 @@ func (r *Resource) LessEqual(rr *Resource, defaultValue DimensionDefaultValue) b
return true
}

// LessEqualWithDimension only compare the resource items in req param
// @param req define the resource item to be compared
// if req is nil, equals r.LessEqual(rr, zero)
func (r *Resource) LessEqualWithDimension(rr *Resource, req *Resource) bool {
if r == nil {
return true
}
if rr == nil {
return false
}
if req == nil {
return r.LessEqual(rr, Zero)
}

if req.MilliCPU > 0 && r.MilliCPU > rr.MilliCPU {
return false
}
if req.Memory > 0 && r.Memory > rr.Memory {
return false
}

// if r.scalar is nil, whatever rr.scalar is, r is less or equal to rr
if r.ScalarResources == nil {
return true
}

for name, quant := range req.ScalarResources {
rQuant := r.ScalarResources[name]
rrQuant := rr.ScalarResources[name]
if quant > 0 && rQuant > rrQuant {
return false
}
}
return true
}

// LessEqualWithResourcesName returns true, []string{} only on condition that all dimensions of resources in r are less than or equal with that of rr,
// Otherwise returns false and err string ,which show what resources are insufficient.
// @param defaultValue "default value for resource dimension not defined in ScalarResources. Its value can only be one of 'Zero' and 'Infinity'"
Expand Down
90 changes: 90 additions & 0 deletions pkg/scheduler/api/resource_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,96 @@ func TestLessEqual(t *testing.T) {
}
}

func TestLessEqualWithDimension(t *testing.T) {
tests := []struct {
resource1 *Resource
resource2 *Resource
req *Resource
expected bool
}{
{
resource1: &Resource{},
resource2: &Resource{},
req: nil,
expected: true,
},
{
resource1: &Resource{MilliCPU: 5000},
resource2: &Resource{
MilliCPU: 4000,
Memory: 2000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000},
},
req: &Resource{MilliCPU: 1000},
expected: false,
},
{
resource1: &Resource{MilliCPU: 3000, Memory: 3000},
resource2: &Resource{MilliCPU: 4000, Memory: 2000},
req: &Resource{Memory: 1000},
expected: false,
},
{
resource1: &Resource{
MilliCPU: 4,
Memory: 4000,
},
resource2: &Resource{},
req: &Resource{},
expected: true,
},
{
resource1: &Resource{
MilliCPU: 4, Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1},
},
resource2: &Resource{MilliCPU: 8, Memory: 8000},
req: &Resource{
MilliCPU: 4, Memory: 2000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1},
},
expected: false,
},
{
resource1: &Resource{
MilliCPU: 10, Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1},
},
resource2: &Resource{
MilliCPU: 100, Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/A100": 1},
},
req: &Resource{
MilliCPU: 10, Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 0, "nvidia.com/A100": 1, "scalar": 1},
},
expected: true,
},
{
resource1: &Resource{
MilliCPU: 110, Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1, "nvidia.com/A100": 1, "scalar": 1},
},
resource2: &Resource{
MilliCPU: 100, Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/A100": 1, "scalar": 1},
},
req: &Resource{
Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 0, "nvidia.com/A100": 1, "scalar": 1},
},
expected: true,
},
}

for i, test := range tests {
flag := test.resource1.LessEqualWithDimension(test.resource2, test.req)
if !equality.Semantic.DeepEqual(test.expected, flag) {
t.Errorf("Case %v: expected: %#v, got: %#v", i, test.expected, flag)
}
}
}

func TestLessPartly(t *testing.T) {
testsForDefaultZero := []struct {
resource1 *Resource
Expand Down
29 changes: 14 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ type DefaultBinder struct {
}

// Bind will send bind request to api server
func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, []error) {
var errTasks []*schedulingapi.TaskInfo
var errs []error
func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) map[schedulingapi.TaskID]string {
errMsg := make(map[schedulingapi.TaskID]string)
for _, task := range tasks {
p := task.Pod
if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(),
Expand All @@ -198,19 +197,13 @@ func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*scheduli
},
metav1.CreateOptions{}); err != nil {
klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err)
errTasks = append(errTasks, task)
errs = append(errs, err)
errMsg[task.UID] = err.Error()
} else {
db.recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName)
metrics.UpdateTaskScheduleDuration(metrics.Duration(p.CreationTimestamp.Time)) // update metrics as soon as pod is bind
}
}

if len(errTasks) > 0 {
return errTasks, errs
}

return nil, nil
return errMsg
}

// NewDefaultBinder create binder with kube client and event recorder, support fake binder if passed fake client and fake event recorder
Expand Down Expand Up @@ -892,12 +885,18 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string)
// Bind binds task to the target host.
func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) {
tmp := time.Now()
errTasks, errs := sc.Binder.Bind(sc.kubeClient, tasks)
if errs == nil {
errMsg := sc.Binder.Bind(sc.kubeClient, tasks)
if len(errMsg) == 0 {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
} else {
for i, task := range errTasks {
unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, errs[i])
klog.V(3).Infof("There are %d tasks in total and %d binds failed, latency %v", len(tasks), len(errMsg), time.Since(tmp))
}

for _, task := range tasks {
if reason, ok := errMsg[task.UID]; !ok {
sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName)
} else {
unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, reason)
if err := sc.taskUnschedulable(task, schedulingapi.PodReasonSchedulerError, unschedulableMsg, ""); err != nil {
klog.ErrorS(err, "Failed to update pod status when bind task error", "task", task.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type VolumeBinder interface {

// Binder interface for binding task and hostname
type Binder interface {
Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error)
Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string
}

// Evictor interface for evict pods
Expand Down
20 changes: 10 additions & 10 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
Subsystem: VolcanoNamespace,
Name: "e2e_scheduling_latency_milliseconds",
Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
Buckets: prometheus.ExponentialBuckets(5, 2, 15),
},
)

Expand Down Expand Up @@ -83,18 +83,18 @@ var (
pluginSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: VolcanoNamespace,
Name: "plugin_scheduling_latency_microseconds",
Help: "Plugin scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
Name: "plugin_scheduling_latency_milliseconds",
Help: "Plugin scheduling latency in milliseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 15),
}, []string{"plugin", "OnSession"},
)

actionSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: VolcanoNamespace,
Name: "action_scheduling_latency_microseconds",
Help: "Action scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
Name: "action_scheduling_latency_milliseconds",
Help: "Action scheduling latency in milliseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 15),
}, []string{"action"},
)

Expand All @@ -103,7 +103,7 @@ var (
Subsystem: VolcanoNamespace,
Name: "task_scheduling_latency_milliseconds",
Help: "Task scheduling latency in milliseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
Buckets: prometheus.ExponentialBuckets(5, 2, 15),
},
)

Expand Down Expand Up @@ -150,12 +150,12 @@ var (

// UpdatePluginDuration updates latency for every plugin
func UpdatePluginDuration(pluginName, onSessionStatus string, duration time.Duration) {
pluginSchedulingLatency.WithLabelValues(pluginName, onSessionStatus).Observe(DurationInMicroseconds(duration))
pluginSchedulingLatency.WithLabelValues(pluginName, onSessionStatus).Observe(DurationInMilliseconds(duration))
}

// UpdateActionDuration updates latency for every action
func UpdateActionDuration(actionName string, duration time.Duration) {
actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMicroseconds(duration))
actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMilliseconds(duration))
}

// UpdateE2eDuration updates entire end to end scheduling latency
Expand Down
Loading

0 comments on commit b7f6e4a

Please sign in to comment.