Skip to content

Commit

Permalink
Merge pull request #599 from ffromani/nrt-extra-informer
Browse files Browse the repository at this point in the history
nrt: cache: add support for dedicated informer
  • Loading branch information
k8s-ci-robot authored Jan 18, 2024
2 parents d9c3dcc + e466bc4 commit 93c518b
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 169 deletions.
13 changes: 13 additions & 0 deletions apis/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ const (
CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources"
)

// CacheInformerMode is a "string" type
type CacheInformerMode string

const (
CacheInformerShared CacheInformerMode = "Shared"
CacheInformerDedicated CacheInformerMode = "Dedicated"
)

// NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache.
type NodeResourceTopologyCache struct {
// ForeignPodsDetect sets how foreign pods should be handled.
Expand All @@ -192,6 +200,11 @@ type NodeResourceTopologyCache struct {
// Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes
// is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All".
ResyncMethod *CacheResyncMethod
// InformerMode controls the channel the cache uses to get updates about pods.
// "Shared" uses the default settings; "Dedicated" creates a specific subscription which is
// guaranteed to best suit the cache needs, at cost of one extra connection.
// If unspecified, default is "Dedicated"
InformerMode *CacheInformerMode
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
5 changes: 5 additions & 0 deletions apis/config/v1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var (

defaultResyncMethod = CacheResyncAutodetect

defaultInformerMode = CacheInformerDedicated

// Defaults for NetworkOverhead
// DefaultWeightsName contains the default costs to be used by networkAware plugins
DefaultWeightsName = "UserDefined"
Expand Down Expand Up @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg
if obj.Cache.ResyncMethod == nil {
obj.Cache.ResyncMethod = &defaultResyncMethod
}
if obj.Cache.InformerMode == nil {
obj.Cache.InformerMode = &defaultInformerMode
}
}

// SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs
Expand Down
1 change: 1 addition & 0 deletions apis/config/v1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) {
Cache: &NodeResourceTopologyCache{
ForeignPodsDetect: &defaultForeignPodsDetect,
ResyncMethod: &defaultResyncMethod,
InformerMode: &defaultInformerMode,
},
},
},
Expand Down
13 changes: 13 additions & 0 deletions apis/config/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ const (
CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources"
)

// CacheInformerMode is a "string" type
type CacheInformerMode string

const (
CacheInformerShared CacheInformerMode = "Shared"
CacheInformerDedicated CacheInformerMode = "Dedicated"
)

// NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache.
type NodeResourceTopologyCache struct {
// ForeignPodsDetect sets how foreign pods should be handled.
Expand All @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct {
// Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes
// is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All".
ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"`
// InformerMode controls the channel the cache uses to get updates about pods.
// "Shared" uses the default settings; "Dedicated" creates a specific subscription which is
// guaranteed to best suit the cache needs, at cost of one extra connection.
// If unspecified, default is "Dedicated"
InformerMode *CacheInformerMode `json:"informerMode,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 2 additions & 0 deletions apis/config/v1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apis/config/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apis/config/v1beta3/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var (

defaultResyncMethod = CacheResyncAutodetect

defaultInformerMode = CacheInformerDedicated

// Defaults for NetworkOverhead
// DefaultWeightsName contains the default costs to be used by networkAware plugins
DefaultWeightsName = "UserDefined"
Expand Down Expand Up @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg
if obj.Cache.ResyncMethod == nil {
obj.Cache.ResyncMethod = &defaultResyncMethod
}
if obj.Cache.InformerMode == nil {
obj.Cache.InformerMode = &defaultInformerMode
}
}

// SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs
Expand Down
1 change: 1 addition & 0 deletions apis/config/v1beta3/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) {
Cache: &NodeResourceTopologyCache{
ForeignPodsDetect: &defaultForeignPodsDetect,
ResyncMethod: &defaultResyncMethod,
InformerMode: &defaultInformerMode,
},
},
},
Expand Down
13 changes: 13 additions & 0 deletions apis/config/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ const (
CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources"
)

// CacheInformerMode is a "string" type
type CacheInformerMode string

const (
CacheInformerShared CacheInformerMode = "Shared"
CacheInformerDedicated CacheInformerMode = "Dedicated"
)

// NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache.
type NodeResourceTopologyCache struct {
// ForeignPodsDetect sets how foreign pods should be handled.
Expand All @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct {
// Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes
// is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All".
ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"`
// InformerMode controls the channel the cache uses to get updates about pods.
// "Shared" uses the default settings; "Dedicated" creates a specific subscription which is
// guaranteed to best suit the cache needs, at cost of one extra connection.
// If unspecified, default is "Dedicated"
InformerMode *CacheInformerMode `json:"informerMode,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 2 additions & 0 deletions apis/config/v1beta3/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apis/config/v1beta3/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apis/config/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 28 additions & 9 deletions pkg/noderesourcetopology/cache/overreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import (
"github.com/k8stopologyawareschedwg/podfingerprint"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
podlisterv1 "k8s.io/client-go/listers/core/v1"
k8scache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify"
)

Expand All @@ -50,9 +51,10 @@ type OverReserve struct {
nodesWithForeignPods counter
podLister podlisterv1.PodLister
resyncMethod apiconfig.CacheResyncMethod
isPodRelevant podprovider.PodFilterFunc
}

func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister) (*OverReserve, error) {
func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc) (*OverReserve, error) {
if client == nil || podLister == nil {
return nil, fmt.Errorf("nrtcache: received nil references")
}
Expand All @@ -74,6 +76,7 @@ func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.
nodesWithForeignPods: newCounter(),
podLister: podLister,
resyncMethod: resyncMethod,
isPodRelevant: isPodRelevant,
}
return obj, nil
}
Expand Down Expand Up @@ -199,7 +202,7 @@ func (ov *OverReserve) Resync() {
}

// node -> pod identifier (namespace, name)
nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, logID)
nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, ov.isPodRelevant, logID)
if err != nil {
klog.ErrorS(err, "cannot find the mapping between running pods and nodes")
return
Expand Down Expand Up @@ -267,16 +270,32 @@ func (ov *OverReserve) FlushNodes(logID string, nrts ...*topologyv1alpha2.NodeRe
}
}

func InformerFromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) {
podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut
return podHandle.Informer(), podHandle.Lister()
}

// to be used only in tests
func (ov *OverReserve) Store() *nrtStore {
return ov.nrts
}

func makeNodeToPodDataMap(podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc, logID string) (map[string][]podData, error) {
nodeToObjsMap := make(map[string][]podData)
pods, err := podLister.List(labels.Everything())
if err != nil {
return nodeToObjsMap, err
}
for _, pod := range pods {
if !isPodRelevant(pod, logID) {
continue
}
nodeObjs := nodeToObjsMap[pod.Spec.NodeName]
nodeObjs = append(nodeObjs, podData{
Namespace: pod.Namespace,
Name: pod.Name,
HasExclusiveResources: resourcerequests.AreExclusiveForPod(pod),
})
nodeToObjsMap[pod.Spec.NodeName] = nodeObjs
}
return nodeToObjsMap, nil
}

func logIDFromTime() string {
return fmt.Sprintf("resync%v", time.Now().UnixMilli())
}
Expand Down
Loading

0 comments on commit 93c518b

Please sign in to comment.