Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick for release-1.8] remove node out of sync state #3006

Merged
merged 1 commit into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (alloc *Action) Execute(ssn *framework.Session) {
allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Check for Resource Predicate
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, reason)
if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, api.WrapInsufficientResourceReason(resources))
}
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
Expand Down
44 changes: 22 additions & 22 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,8 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) {
}

// set NodeState according to resources
if !ni.Used.LessEqual(ni.Allocatable, Zero) {
ni.State = NodeState{
Phase: NotReady,
Reason: "OutOfSync",
}
return
if ok, resources := ni.Used.LessEqualWithResourcesName(ni.Allocatable, Zero); !ok {
klog.ErrorS(nil, "Node out of sync", "name", ni.Name, "resources", resources)
}

// If node not ready, e.g. power off
Expand Down Expand Up @@ -372,30 +368,30 @@ func (ni *NodeInfo) setNode(node *v1.Node) {
for _, ti := range ni.Tasks {
switch ti.Status {
case Releasing:
ni.Idle.sub(ti.Resreq) // sub without assertion
ni.allocateIdleResource(ti)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
ni.Idle.sub(ti.Resreq) // sub without assertion
ni.allocateIdleResource(ti)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
}
}
}

func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) error {
if ti.Resreq.LessEqual(ni.Idle, Zero) {
ni.Idle.Sub(ti.Resreq)
return nil
func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) {
ok, resources := ti.Resreq.LessEqualWithResourcesName(ni.Idle, Zero)
if ok {
ni.Idle.sub(ti.Resreq)
return
}

return &AllocateFailError{Reason: fmt.Sprintf(
"cannot allocate resource, <%s> idle: %s <%s/%s> req: %s",
ni.Name, ni.Idle.String(), ti.Namespace, ti.Name, ti.Resreq.String(),
)}
ni.Idle.sub(ti.Resreq)
klog.ErrorS(nil, "Idle resources turn into negative after allocated",
"nodeName", ni.Name, "task", klog.KObj(ti.Pod), "resources", resources, "idle", ni.Idle.String(), "req", ti.Resreq.String())
}

// AddTask is used to add a task in nodeInfo object
Expand All @@ -420,18 +416,22 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
if ni.Node != nil {
switch ti.Status {
case Releasing:
if err := ni.allocateIdleResource(ti); err != nil {
return err
}
ni.allocateIdleResource(ti)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
if err := ni.allocateIdleResource(ti); err != nil {
return err
case Binding:
// When task in Binding status, it will bind to node, we should double-check whether idle resources are enough to put task before bind to apiserver.
if ok, resNames := ti.Resreq.LessEqualWithResourcesName(ni.Idle, Zero); !ok {
return fmt.Errorf("node %s resources %v are not enough to put task <%s/%s>, idle: %s, req: %s", ni.Name, resNames, ti.Namespace, ti.Name, ni.Idle.String(), ti.Resreq.String())
}
ni.allocateIdleResource(ti)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
default:
ni.allocateIdleResource(ti)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
}
Expand Down
53 changes: 39 additions & 14 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,31 @@ func TestNodeInfo_AddPod(t *testing.T) {
},
},
{
name: "add 1 unknown pod",
name: "add 1 unknown pod and pod memory req > idle",
node: case02Node,
pods: []*v1.Pod{case02Pod1},
expected: &NodeInfo{
Name: "n2",
Node: case02Node,
Idle: buildResource("2000m", "1G"),
Used: EmptyResource(),
Idle: buildResource("1000m", "-1G"),
Used: buildResource("1000m", "2G"),
Releasing: EmptyResource(),
Pipelined: EmptyResource(),
OversubscriptionResource: EmptyResource(),
Allocatable: buildResource("2000m", "1G"),
Capacity: buildResource("2000m", "1G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
Tasks: map[TaskID]*TaskInfo{
"c2/p1": NewTaskInfo(case02Pod1),
},
Others: map[string]interface{}{
GPUSharingDevice: gpushare.NewGPUDevices("n2", case01Node),
vgpu.DeviceName: vgpu.NewGPUDevices("n2", case01Node),
},
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
expectedFailure: true,
expectedFailure: false,
},
}

Expand Down Expand Up @@ -196,18 +198,42 @@ func TestNodeInfo_SetNode(t *testing.T) {
case01Pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("6", "6G"), []metav1.OwnerReference{}, make(map[string]string))

tests := []struct {
name string
node *v1.Node
updated *v1.Node
pods []*v1.Pod
expected *NodeInfo
name string
node *v1.Node
updated *v1.Node
pods []*v1.Pod
expected *NodeInfo
expected2 *NodeInfo
}{
{
name: "add 3 running non-owner pod",
node: case01Node1,
updated: case01Node2,
pods: []*v1.Pod{case01Pod1, case01Pod2, case01Pod3},
expected: &NodeInfo{
Name: "n1",
Node: case01Node2,
Idle: buildResource("-1", "-1G"),
Used: buildResource("9", "9G"),
OversubscriptionResource: EmptyResource(),
Releasing: EmptyResource(),
Pipelined: EmptyResource(),
Allocatable: buildResource("8", "8G"),
Capacity: buildResource("8", "8G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready, Reason: ""},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
"c1/p3": NewTaskInfo(case01Pod3),
},
Others: map[string]interface{}{
GPUSharingDevice: gpushare.NewGPUDevices("n1", case01Node1),
vgpu.DeviceName: vgpu.NewGPUDevices("n1", case01Node1),
},
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
expected2: &NodeInfo{
Name: "n1",
Node: case01Node1,
Idle: buildResource("1", "1G"),
Expand All @@ -218,7 +244,7 @@ func TestNodeInfo_SetNode(t *testing.T) {
Allocatable: buildResource("10", "10G"),
Capacity: buildResource("10", "10G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: NotReady, Reason: "OutOfSync"},
State: NodeState{Phase: Ready, Reason: ""},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
Expand Down Expand Up @@ -250,10 +276,9 @@ func TestNodeInfo_SetNode(t *testing.T) {

// Recover. e.g.: nvidia-device-plugin is restarted successfully
ni.SetNode(test.node)
test.expected.State = NodeState{Phase: Ready}
if !nodeInfoEqual(ni, test.expected) {
if !nodeInfoEqual(ni, test.expected2) {
t.Errorf("recovered %d: \n expected\t%v, \n got\t\t%v \n",
i, test.expected, ni)
i, test.expected2, ni)
}
}
}
18 changes: 11 additions & 7 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,12 @@ func (r *Resource) LessEqual(rr *Resource, defaultValue DimensionDefaultValue) b
return true
}

// LessEqualWithReason returns true, "" only on condition that all dimensions of resources in r are less than or equal with that of rr,
// Otherwise returns false and err string ,which show which resource is insufficient.
// LessEqualWithResourcesName returns true, []string{} only on condition that all dimensions of resources in r are less than or equal with that of rr,
// Otherwise returns false and err string ,which show what resources are insufficient.
// @param defaultValue "default value for resource dimension not defined in ScalarResources. Its value can only be one of 'Zero' and 'Infinity'"
// this function is the same as LessEqual , and it will be merged to LessEqual in the future
func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefaultValue) (bool, string) {
func (r *Resource) LessEqualWithResourcesName(rr *Resource, defaultValue DimensionDefaultValue) (bool, []string) {
resources := []string{}
lessEqualFunc := func(l, r, diff float64) bool {
if l < r || math.Abs(l-r) < diff {
return true
Expand All @@ -418,10 +419,10 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau
}

if !lessEqualFunc(r.MilliCPU, rr.MilliCPU, minResource) {
return false, "Insufficient cpu"
resources = append(resources, "cpu")
}
if !lessEqualFunc(r.Memory, rr.Memory, minResource) {
return false, "Insufficient memory"
resources = append(resources, "memory")
}

for resourceName, leftValue := range r.ScalarResources {
Expand All @@ -431,10 +432,13 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau
}

if !lessEqualFunc(leftValue, rightValue, minResource) {
return false, "Insufficient " + string(resourceName)
resources = append(resources, string(resourceName))
}
}
return true, ""
if len(resources) > 0 {
return false, resources
}
return true, resources
}

// LessPartly returns true if there exists any dimension whose resource amount in r is less than that in rr.
Expand Down
30 changes: 15 additions & 15 deletions pkg/scheduler/api/resource_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,12 +1256,12 @@ func TestResource_LessEqualResource(t *testing.T) {
testsForDefaultZero := []struct {
resource1 *Resource
resource2 *Resource
expected string
expected []string
}{
{
resource1: &Resource{},
resource2: &Resource{},
expected: "",
expected: []string{},
},
{
resource1: &Resource{},
Expand All @@ -1270,7 +1270,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 2000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
expected: "",
expected: []string{},
},
{
resource1: &Resource{
Expand All @@ -1279,7 +1279,7 @@ func TestResource_LessEqualResource(t *testing.T) {
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
resource2: &Resource{},
expected: "Insufficient cpu",
expected: []string{"cpu", "memory", "scalar.test/scalar1", "hugepages-test"},
},
{
resource1: &Resource{
Expand All @@ -1292,7 +1292,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "",
expected: []string{},
},
{
resource1: &Resource{
Expand All @@ -1305,7 +1305,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "",
expected: []string{},
},
{
resource1: &Resource{
Expand All @@ -1318,7 +1318,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "",
expected: []string{},
},
{
resource1: &Resource{
Expand All @@ -1331,7 +1331,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "Insufficient scalar.test/scalar1",
expected: []string{"scalar.test/scalar1"},
},
{
resource1: &Resource{
Expand All @@ -1344,19 +1344,19 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "Insufficient cpu",
expected: []string{"cpu"},
},
}

testsForDefaultInfinity := []struct {
resource1 *Resource
resource2 *Resource
expected string
expected []string
}{
{
resource1: &Resource{},
resource2: &Resource{},
expected: "",
expected: []string{},
},
{
resource1: &Resource{},
Expand All @@ -1365,7 +1365,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 2000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
expected: "",
expected: []string{},
},
{
resource1: &Resource{
Expand All @@ -1374,18 +1374,18 @@ func TestResource_LessEqualResource(t *testing.T) {
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
resource2: &Resource{},
expected: "Insufficient cpu",
expected: []string{"cpu", "memory"},
},
}

for _, test := range testsForDefaultZero {
_, reason := test.resource1.LessEqualWithReason(test.resource2, Zero)
_, reason := test.resource1.LessEqualWithResourcesName(test.resource2, Zero)
if !reflect.DeepEqual(test.expected, reason) {
t.Errorf("expected: %#v, got: %#v", test.expected, reason)
}
}
for caseID, test := range testsForDefaultInfinity {
_, reason := test.resource1.LessEqualWithReason(test.resource2, Infinity)
_, reason := test.resource1.LessEqualWithResourcesName(test.resource2, Infinity)
if !reflect.DeepEqual(test.expected, reason) {
t.Errorf("caseID %d expected: %#v, got: %#v", caseID, test.expected, reason)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError {
func (f *FitError) Error() string {
return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", "))
}

// WrapInsufficientResourceReason wrap insufficient resource reason.
func WrapInsufficientResourceReason(resources []string) string {
if len(resources) == 0 {
return ""
}
return "Insufficient " + resources[0]
}
6 changes: 0 additions & 6 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func (sc *SchedulerCache) addTask(pi *schedulingapi.TaskInfo) error {
node := sc.Nodes[pi.NodeName]
if !isTerminated(pi.Status) {
if err := node.AddTask(pi); err != nil {
if _, outOfSync := err.(*schedulingapi.AllocateFailError); outOfSync {
node.State = schedulingapi.NodeState{
Phase: schedulingapi.NotReady,
Reason: "OutOfSync",
}
}
return err
}
} else {
Expand Down