Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2896] [shim] Remove occupiedResource handling logic #927

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ go 1.22.0
toolchain go1.22.5

require (
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/apache/yunikorn-core v0.0.0-20241017135039-079a02dbdfa7
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf h1:wKySiY4IA9Us287QRnIxFnuTHXaMSeQ3BhAwSrSW/sQ=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/apache/yunikorn-core v0.0.0-20241017135039-079a02dbdfa7 h1:PY3kIiQYxsNcs42DK+8b7NxfTvMF0Z6eIuK+aJNWl18=
github.com/apache/yunikorn-core v0.0.0-20241017135039-079a02dbdfa7/go.mod h1:JA8Uee+D+T9v3p+YznGiGM9cLk5tzX+EM+YYr1TdFYo=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146 h1:CZ4U7y19YSxNJVBNox3DahhuoxDL++naBl/kj+kqVFc=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
Expand Down
31 changes: 7 additions & 24 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,8 @@

if !common.Equals(prevCapacity, newCapacity) {
// update capacity
if capacity, occupied, ok := ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok {
if err := ctx.updateNodeResources(node, capacity, occupied); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err))
}
if err := ctx.updateNodeResources(node, newCapacity); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err))

Check warning on line 221 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L221

Added line #L221 was not covered by tests
} else {
log.Log(log.ShimContext).Warn("Failed to update cached node capacity", zap.String("nodeName", node.Name))
}
Expand Down Expand Up @@ -372,7 +370,7 @@
if utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
if ctx.schedulerCache.UpdatePod(pod) {
// pod was accepted by a real node
log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger occupied resource update",
log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger foreign resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
Expand All @@ -398,7 +396,7 @@
// 3. pod references a known node
if oldPod != nil && utils.IsPodTerminated(pod) {
if !ctx.schedulerCache.IsPodOrphaned(string(pod.UID)) {
log.Log(log.ShimContext).Debug("pod terminated, trigger occupied resource update",
log.Log(log.ShimContext).Debug("pod terminated, trigger foreign resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
Expand All @@ -413,7 +411,7 @@
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for terminated orphaned pod",
log.Log(log.ShimContext).Info("skipping foreign resource update for terminated orphaned pod",

Check warning on line 414 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L414

Added line #L414 was not covered by tests
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("nodeName", pod.Spec.NodeName))
Expand Down Expand Up @@ -470,20 +468,6 @@
ctx.schedulerCache.RemovePod(pod)
}

//nolint:unused
func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace string, podName string, resource *si.Resource, opt schedulercache.UpdateType) {
if common.IsZero(resource) {
return
}
if node, capacity, occupied, ok := ctx.schedulerCache.UpdateOccupiedResource(nodeName, namespace, podName, resource, opt); ok {
if err := ctx.updateNodeResources(node, capacity, occupied); err != nil {
log.Log(log.ShimContext).Warn("scheduler rejected update to node occupied resources", zap.Error(err))
}
} else {
log.Log(log.ShimContext).Warn("unable to update occupied resources for node", zap.String("nodeName", nodeName))
}
}

// filter configMap for the scheduler
func (ctx *Context) filterConfigMaps(obj interface{}) bool {
switch obj := obj.(type) {
Expand Down Expand Up @@ -1514,7 +1498,6 @@
constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName,
},
SchedulableResource: common.GetNodeResource(&nodeStatus),
OccupiedResource: common.NewResourceBuilder().Build(),
})
pendingNodes[node.Name] = node
}
Expand Down Expand Up @@ -1596,8 +1579,8 @@
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, nil)
func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

Expand Down
23 changes: 4 additions & 19 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ import (
k8sEvents "k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"

schedulercache "github.com/apache/yunikorn-k8shim/pkg/cache/external"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/test"
Expand Down Expand Up @@ -183,8 +181,8 @@ func TestUpdateNodes(t *testing.T) {
})

oldNodeResource := make(map[v1.ResourceName]resource.Quantity)
oldNodeResource[v1.ResourceName("memory")] = *resource.NewQuantity(1024*1000*1000, resource.DecimalSI)
oldNodeResource[v1.ResourceName("cpu")] = *resource.NewQuantity(2, resource.DecimalSI)
oldNodeResource["memory"] = *resource.NewQuantity(1024*1000*1000, resource.DecimalSI)
oldNodeResource["cpu"] = *resource.NewQuantity(2, resource.DecimalSI)
oldNode := v1.Node{
ObjectMeta: apis.ObjectMeta{
Name: Host1,
Expand All @@ -197,8 +195,8 @@ func TestUpdateNodes(t *testing.T) {
}

newNodeResource := make(map[v1.ResourceName]resource.Quantity)
newNodeResource[v1.ResourceName("memory")] = *resource.NewQuantity(2048*1000*1000, resource.DecimalSI)
newNodeResource[v1.ResourceName("cpu")] = *resource.NewQuantity(4, resource.DecimalSI)
newNodeResource["memory"] = *resource.NewQuantity(2048*1000*1000, resource.DecimalSI)
newNodeResource["cpu"] = *resource.NewQuantity(4, resource.DecimalSI)
newNode := v1.Node{
ObjectMeta: apis.ObjectMeta{
Name: Host1,
Expand All @@ -212,12 +210,6 @@ func TestUpdateNodes(t *testing.T) {

ctx.addNode(&oldNode)
ctx.updateNode(&oldNode, &newNode)

_, capacity, _, ok := ctx.schedulerCache.UpdateOccupiedResource(
Host1, "n/a", "n/a", nil, schedulercache.AddOccupiedResource)
assert.Assert(t, ok, "unable to retrieve node capacity")
assert.Equal(t, int64(2048*1000*1000), capacity.Resources[siCommon.Memory].Value)
assert.Equal(t, int64(4000), capacity.Resources[siCommon.CPU].Value)
}

func TestDeleteNodes(t *testing.T) {
Expand Down Expand Up @@ -1915,13 +1907,6 @@ func TestInitializeState(t *testing.T) {
assert.Equal(t, *pc.PreemptionPolicy, policy, "wrong preemption policy")
assert.Equal(t, pc.Annotations[constants.AnnotationAllowPreemption], constants.True, "wrong allow-preemption value")

// verify occupied / capacity on node
capacity, _, ok := context.schedulerCache.SnapshotResources(nodeName1)
assert.Assert(t, ok, "Unable to retrieve node resources")
expectedCapacity := common.ParseResource("4", "10G")
assert.Equal(t, expectedCapacity.Resources["vcore"].Value, capacity.Resources["vcore"].Value, "wrong capacity vcore")
assert.Equal(t, expectedCapacity.Resources["memory"].Value, capacity.Resources["memory"].Value, "wrong capacity memory")

// check that pod orphan status is correct
assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName1), "pod1 should not be orphaned")
assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName2), "pod2 should not be orphaned")
Expand Down
83 changes: 0 additions & 83 deletions pkg/cache/external/scheduler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,9 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)

type UpdateType int

const (
AddOccupiedResource UpdateType = iota
SubOccupiedResource
)

// SchedulerCache maintains some critical information about nodes and pods used for scheduling.
Expand All @@ -59,8 +50,6 @@ const (
// is called in the plugin to signify completion of the allocation, it is removed.
type SchedulerCache struct {
nodesMap map[string]*framework.NodeInfo // node name to NodeInfo map
nodeCapacity map[string]*si.Resource // node name to node resource capacity
nodeOccupied map[string]*si.Resource // node name to node occupied resources
podsMap map[string]*v1.Pod
pcMap map[string]*schedulingv1.PriorityClass
assignedPods map[string]string // map of pods to the node they are currently assigned to
Expand Down Expand Up @@ -90,8 +79,6 @@ type taskBloomFilter struct {
func NewSchedulerCache(clients *client.Clients) *SchedulerCache {
cache := &SchedulerCache{
nodesMap: make(map[string]*framework.NodeInfo),
nodeCapacity: make(map[string]*si.Resource),
nodeOccupied: make(map[string]*si.Resource),
podsMap: make(map[string]*v1.Pod),
pcMap: make(map[string]*schedulingv1.PriorityClass),
assignedPods: make(map[string]string),
Expand Down Expand Up @@ -197,8 +184,6 @@ func (cache *SchedulerCache) updateNode(node *v1.Node) (*v1.Node, []*v1.Pod) {
log.Log(log.ShimCacheExternal).Debug("Adding node to cache", zap.String("nodeName", node.Name))
nodeInfo = framework.NewNodeInfo()
cache.nodesMap[node.Name] = nodeInfo
cache.nodeCapacity[node.Name] = common.GetNodeResource(&node.Status)
cache.nodeOccupied[node.Name] = common.NewResourceBuilder().Build()
cache.nodesInfo = nil
nodeInfo.SetNode(node)

Expand Down Expand Up @@ -253,8 +238,6 @@ func (cache *SchedulerCache) removeNode(node *v1.Node) (*v1.Node, []*v1.Pod) {

log.Log(log.ShimCacheExternal).Debug("Removing node from cache", zap.String("nodeName", node.Name))
delete(cache.nodesMap, node.Name)
delete(cache.nodeOccupied, node.Name)
delete(cache.nodeCapacity, node.Name)
cache.nodesInfo = nil
cache.nodesInfoPodsWithAffinity = nil
cache.nodesInfoPodsWithReqAntiAffinity = nil
Expand All @@ -263,72 +246,6 @@ func (cache *SchedulerCache) removeNode(node *v1.Node) (*v1.Node, []*v1.Pod) {
return result, orphans
}

func (cache *SchedulerCache) SnapshotResources(nodeName string) (capacity *si.Resource, occupied *si.Resource, ok bool) {
cache.lock.RLock()
defer cache.lock.RUnlock()

occupied, ok1 := cache.nodeOccupied[nodeName]
capacity, ok2 := cache.nodeCapacity[nodeName]
if !ok1 || !ok2 {
log.Log(log.ShimCacheExternal).Warn("Unable to snapshot resources for node", zap.String("nodeName", nodeName))
return nil, nil, false
}
return capacity, occupied, true
}

func (cache *SchedulerCache) UpdateCapacity(nodeName string, resource *si.Resource) (capacity *si.Resource, occupied *si.Resource, ok bool) {
cache.lock.Lock()
defer cache.lock.Unlock()

occupied, ok1 := cache.nodeOccupied[nodeName]
_, ok2 := cache.nodeCapacity[nodeName]
if !ok1 || !ok2 {
log.Log(log.ShimCacheExternal).Warn("Unable to update capacity for node", zap.String("nodeName", nodeName))
return nil, nil, false
}
cache.nodeCapacity[nodeName] = resource
return resource, occupied, true
}

func (cache *SchedulerCache) UpdateOccupiedResource(nodeName string, namespace string, podName string, resource *si.Resource, opt UpdateType) (node *v1.Node, capacity *si.Resource, occupied *si.Resource, ok bool) {
cache.lock.Lock()
defer cache.lock.Unlock()

nodeInfo, ok1 := cache.nodesMap[nodeName]
occupied, ok2 := cache.nodeOccupied[nodeName]
capacity, ok3 := cache.nodeCapacity[nodeName]
if !ok1 || !ok2 || !ok3 {
log.Log(log.ShimCacheExternal).Warn("Unable to update occupied resources for node",
zap.String("nodeName", nodeName),
zap.String("namespace", namespace),
zap.String("podName", podName))
return nil, nil, nil, false
}
node = nodeInfo.Node()

switch opt {
case AddOccupiedResource:
log.Log(log.ShimCacheExternal).Info("Adding occupied resources to node",
zap.String("nodeID", nodeName),
zap.String("namespace", namespace),
zap.String("podName", podName),
zap.Stringer("occupied", resource))
occupied = common.Add(occupied, resource)
cache.nodeOccupied[nodeName] = occupied
case SubOccupiedResource:
log.Log(log.ShimCacheExternal).Info("Subtracting occupied resources from node",
zap.String("nodeID", nodeName),
zap.String("namespace", namespace),
zap.String("podName", podName),
zap.Stringer("occupied", resource))
occupied = common.Sub(occupied, resource)
cache.nodeOccupied[nodeName] = occupied
default:
// noop
}
return node, capacity, occupied, true
}

func (cache *SchedulerCache) GetPriorityClass(name string) *schedulingv1.PriorityClass {
cache.lock.RLock()
defer cache.lock.RUnlock()
Expand Down
85 changes: 0 additions & 85 deletions pkg/cache/external/scheduler_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
)

const (
Expand Down Expand Up @@ -1097,90 +1096,6 @@ func TestUpdatePVCRefCounts(t *testing.T) {
assert.Check(t, !cache.IsPVCUsedByPods(framework.GetNamespacedName(pod2.Namespace, pvcName2)), "pvc2 is in pvcRefCounts")
}

func TestNodeResources(t *testing.T) {
cache := NewSchedulerCache(client.NewMockedAPIProvider(false).GetAPIs())
resourceList := make(map[v1.ResourceName]resource.Quantity)
resourceList["memory"] = *resource.NewQuantity(1024*1000*1000, resource.DecimalSI)
resourceList["cpu"] = *resource.NewQuantity(10, resource.DecimalSI)
node := &v1.Node{
ObjectMeta: apis.ObjectMeta{
Name: host1,
Namespace: "default",
UID: nodeUID1,
},
Status: v1.NodeStatus{
Allocatable: resourceList,
},
Spec: v1.NodeSpec{
Unschedulable: false,
},
}
cache.UpdateNode(node)

// test snapshot with missing node
capacity, occupied, ok := cache.SnapshotResources("missing")
assert.Assert(t, !ok, "got result for missing host")
assert.Assert(t, capacity == nil, "got capacity for missing host")
assert.Assert(t, occupied == nil, "got occupied for missing host")

// test snapshot with existing, unoccupied node
capacity, occupied, ok = cache.SnapshotResources(host1)
assert.Assert(t, ok, "no result for host1")
assert.Equal(t, int64(1024*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1")
assert.Equal(t, int64(10*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1")
assert.Equal(t, 0, len(occupied.Resources), "non-empty occupied resources")

res1 := common.NewResourceBuilder().AddResource("memory", 2048*1000*1000).AddResource("vcore", 20000).Build()
res2 := common.NewResourceBuilder().AddResource("memory", 512*1000*1000).AddResource("vcore", 5000).Build()

// update capacity with missing node
capacity, occupied, ok = cache.UpdateCapacity("missing", res1)
assert.Assert(t, !ok, "got result for missing host")
assert.Assert(t, capacity == nil, "got capacity for missing host")
assert.Assert(t, occupied == nil, "got occupied for missing host")

// update capacity with real node
capacity, occupied, ok = cache.UpdateCapacity(host1, res1)
assert.Assert(t, ok, "no result for host1")
assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1")
assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1")
assert.Equal(t, 0, len(occupied.Resources), "non-empty occupied resources")

// update occupied resources with missing node
node, capacity, occupied, ok = cache.UpdateOccupiedResource("missing", "default", "podName", res2, AddOccupiedResource)
assert.Assert(t, !ok, "got result for missing host")
assert.Assert(t, node == nil, "got node for missing host")
assert.Assert(t, capacity == nil, "got capacity for missing host")
assert.Assert(t, occupied == nil, "got occupied for missing host")

// update occupied resources with real node
node, capacity, occupied, ok = cache.UpdateOccupiedResource(host1, "default", "podName", res2, AddOccupiedResource)
assert.Assert(t, ok, "no result for host1")
assert.Equal(t, host1, node.Name, "wrong host name")
assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1")
assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1")
assert.Equal(t, int64(512*1000*1000), occupied.Resources["memory"].Value, "wrong memory occupied for host1")
assert.Equal(t, int64(5*1000), occupied.Resources["vcore"].Value, "wrong vcore occupied for host1")

// retrieve snapshot again
capacity, occupied, ok = cache.SnapshotResources(host1)
assert.Assert(t, ok, "no result for host1")
assert.Equal(t, host1, node.Name, "wrong host name")
assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1")
assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1")
assert.Equal(t, int64(512*1000*1000), occupied.Resources["memory"].Value, "wrong memory occupied for host1")
assert.Equal(t, int64(5*1000), occupied.Resources["vcore"].Value, "wrong vcore occupied for host1")

// subtract occupied resources with real node
node, capacity, occupied, ok = cache.UpdateOccupiedResource(host1, "default", "podName", res2, SubOccupiedResource)
assert.Assert(t, ok, "no result for host1")
assert.Equal(t, host1, node.Name, "wrong host name")
assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1")
assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1")
assert.Equal(t, int64(0), occupied.Resources["memory"].Value, "wrong memory occupied for host1")
assert.Equal(t, int64(0), occupied.Resources["vcore"].Value, "wrong vcore occupied for host1")
}

func TestOrphanPods(t *testing.T) {
cache := NewSchedulerCache(client.NewMockedAPIProvider(false).GetAPIs())
resourceList := make(map[v1.ResourceName]resource.Quantity)
Expand Down
Loading