Skip to content

Commit

Permalink
CA: move NodeInfo methods from ClusterSnapshotStore to ClusterSnapshot
Browse files Browse the repository at this point in the history
All the NodeInfo methods have to take DRA into account, and the logic
for that will be the same for different ClusterSnapshotStore implementations.
Instead of duplicating the new logic in Basic and Delta, the methods
are moved to ClusterSnapshot and the logic will be implemented once in
PredicateSnapshot.

PredicateSnapshot will use the DRA Snapshot exposed by its ClusterSnapshotStore
to implement these methods. The DRA Snapshot has to be stored in the
ClusterSnapshotStore layer, as we need to be able to fork/commit/revert it.

Lower-level methods for adding/removing just the schedulerframework.NodeInfo
parts are added to ClusterSnapshotStore. PredicateSnapshot utilizes these methods
to implement AddNodeInfo and RemoveNodeInfo.

This should be a no-op, it's just a refactor.
  • Loading branch information
towca committed Dec 6, 2024
1 parent b297715 commit 174dc11
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 96 deletions.
33 changes: 21 additions & 12 deletions cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,27 @@ import (
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// ClusterSnapshot is abstraction of cluster state used for predicate simulations.
// It exposes mutation methods and can be viewed as scheduler's SharedLister.
type ClusterSnapshot interface {
ClusterSnapshotStore

// AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates. The Node and the Pods are added,
// as well as any DRA objects passed along them.
AddNodeInfo(nodeInfo *framework.NodeInfo) error
// RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as
// any DRA objects owned by them.
RemoveNodeInfo(nodeName string) error
// GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot.
// This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos
// obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo.
GetNodeInfo(nodeName string) (*framework.NodeInfo, error)
// ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo.
ListNodeInfos() ([]*framework.NodeInfo, error)

// SchedulePod tries to schedule the given Pod on the Node with the given name inside the snapshot,
// checking scheduling predicates. The pod is only scheduled if the predicates pass. If the pod is scheduled,
// all relevant DRA objects are modified to reflect that. Returns nil if the pod got scheduled, and a non-nil
Expand Down Expand Up @@ -68,18 +82,13 @@ type ClusterSnapshotStore interface {
// ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot.
ForceRemovePod(namespace string, podName string, nodeName string) error

// AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates. The Node and the Pods are added,
// as well as any DRA objects passed along them.
AddNodeInfo(nodeInfo *framework.NodeInfo) error
// RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as
// any DRA objects owned by them.
RemoveNodeInfo(nodeName string) error
// GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot.
// This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos
// obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo.
GetNodeInfo(nodeName string) (*framework.NodeInfo, error)
// ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo.
ListNodeInfos() ([]*framework.NodeInfo, error)
// AddSchedulerNodeInfo adds the given schedulerframework.NodeInfo to the snapshot without checking scheduler predicates, and
// without taking DRA objects into account. This shouldn't be used outside the clustersnapshot pkg, use ClusterSnapshot.AddNodeInfo()
// instead.
AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error
// RemoveSchedulerNodeInfo removes the given schedulerframework.NodeInfo from the snapshot without taking DRA objects into account. This shouldn't
// be used outside the clustersnapshot pkg, use ClusterSnapshot.RemoveNodeInfo() instead.
RemoveSchedulerNodeInfo(nodeName string) error

// DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot.
DraSnapshot() drasnapshot.Snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,27 @@ import (

// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework.
type SchedulerPluginRunner struct {
fwHandle *framework.Handle
snapshotStore clustersnapshot.ClusterSnapshotStore
lastIndex int
fwHandle *framework.Handle
snapshot clustersnapshot.ClusterSnapshot
lastIndex int
}

// NewSchedulerPluginRunner builds a SchedulerPluginRunner.
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotStore clustersnapshot.ClusterSnapshotStore) *SchedulerPluginRunner {
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotStore: snapshotStore}
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot) *SchedulerPluginRunner {
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshot: snapshot}
}

// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided
// function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned.
//
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner.
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
nodeInfosList, err := p.snapshotStore.ListNodeInfos()
nodeInfosList, err := p.snapshot.ListNodeInfos()
if err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err))
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore)
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
defer p.fwHandle.DelegatingLister.ResetDelegate()

state := schedulerframework.NewCycleState()
Expand Down Expand Up @@ -101,12 +101,12 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM

// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node.
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
nodeInfo, err := p.snapshotStore.GetNodeInfo(nodeName)
nodeInfo, err := p.snapshot.GetNodeInfo(nodeName)
if err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore)
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
defer p.fwHandle.DelegatingLister.ResetDelegate()

state := schedulerframework.NewCycleState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,9 @@ func TestRunFiltersOnNode(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
snapshotStore := store.NewBasicSnapshotStore()
err := snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...))
pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(tt.customConfig)
assert.NoError(t, err)

pluginRunner, err := newTestPluginRunner(snapshotStore, tt.customConfig)
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...))
assert.NoError(t, err)

predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name)
Expand Down Expand Up @@ -235,15 +233,14 @@ func TestRunFilterUntilPassingNode(t *testing.T) {
},
}

snapshotStore := store.NewBasicSnapshotStore()
err = snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(n1000))
assert.NoError(t, err)
err = snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(n2000))
assert.NoError(t, err)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
pluginRunner, err := newTestPluginRunner(snapshotStore, tc.customConfig)
pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(tc.customConfig)
assert.NoError(t, err)

err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000))
assert.NoError(t, err)
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000))
assert.NoError(t, err)

nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true })
Expand Down Expand Up @@ -274,13 +271,13 @@ func TestDebugInfo(t *testing.T) {
}
SetNodeReadyState(node1, true, time.Time{})

clusterSnapshot := store.NewBasicSnapshotStore()
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
// with default predicate checker
defaultPluginRunner, clusterSnapshot, err := newTestPluginRunnerAndSnapshot(nil)
assert.NoError(t, err)

// with default predicate checker
defaultPluginRunner, err := newTestPluginRunner(clusterSnapshot, nil)
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
assert.NoError(t, err)

predicateErr := defaultPluginRunner.RunFiltersOnNode(p1, "n1")
assert.NotNil(t, predicateErr)
assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
Expand All @@ -305,25 +302,29 @@ func TestDebugInfo(t *testing.T) {

customConfig, err := scheduler.ConfigFromPath(customConfigFile)
assert.NoError(t, err)
customPluginRunner, err := newTestPluginRunner(clusterSnapshot, customConfig)
customPluginRunner, clusterSnapshot, err := newTestPluginRunnerAndSnapshot(customConfig)
assert.NoError(t, err)

err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
assert.NoError(t, err)

predicateErr = customPluginRunner.RunFiltersOnNode(p1, "n1")
assert.Nil(t, predicateErr)
}

// newTestPluginRunner builds test version of SchedulerPluginRunner.
func newTestPluginRunner(snapshotStore clustersnapshot.ClusterSnapshotStore, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, error) {
func newTestPluginRunnerAndSnapshot(schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, clustersnapshot.ClusterSnapshot, error) {
if schedConfig == nil {
defaultConfig, err := scheduler_config_latest.Default()
if err != nil {
return nil, err
return nil, nil, err
}
schedConfig = defaultConfig
}

fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true)
if err != nil {
return nil, err
return nil, nil, err
}
return NewSchedulerPluginRunner(fwHandle, snapshotStore), nil
snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true)
return NewSchedulerPluginRunner(fwHandle, snapshot), snapshot, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,44 @@ type PredicateSnapshot struct {

// NewPredicateSnapshot builds a PredicateSnapshot.
func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool) *PredicateSnapshot {
return &PredicateSnapshot{
snapshot := &PredicateSnapshot{
ClusterSnapshotStore: snapshotStore,
pluginRunner: NewSchedulerPluginRunner(fwHandle, snapshotStore),
draEnabled: draEnabled,
}
snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot)
return snapshot
}

// GetNodeInfo returns an internal NodeInfo wrapping the relevant schedulerframework.NodeInfo.
func (s *PredicateSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := s.ClusterSnapshotStore.NodeInfos().Get(nodeName)
if err != nil {
return nil, err
}
return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil
}

// ListNodeInfos returns internal NodeInfos wrapping all schedulerframework.NodeInfos in the snapshot.
func (s *PredicateSnapshot) ListNodeInfos() ([]*framework.NodeInfo, error) {
schedNodeInfos, err := s.ClusterSnapshotStore.NodeInfos().List()
if err != nil {
return nil, err
}
var result []*framework.NodeInfo
for _, schedNodeInfo := range schedNodeInfos {
result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil))
}
return result, nil
}

// AddNodeInfo adds the provided internal NodeInfo to the snapshot.
func (s *PredicateSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) error {
return s.ClusterSnapshotStore.AddSchedulerNodeInfo(nodeInfo.ToScheduler())
}

// RemoveNodeInfo removes a NodeInfo matching the provided nodeName from the snapshot.
func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error {
return s.ClusterSnapshotStore.RemoveSchedulerNodeInfo(nodeName)
}

// SchedulePod adds pod to the snapshot and schedules it to given node.
Expand Down
30 changes: 5 additions & 25 deletions cluster-autoscaler/simulator/clustersnapshot/store/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
Expand Down Expand Up @@ -213,31 +212,12 @@ func (snapshot *BasicSnapshotStore) DraSnapshot() drasnapshot.Snapshot {
return snapshot.getInternalData().draSnapshot
}

// GetNodeInfo gets a NodeInfo.
func (snapshot *BasicSnapshotStore) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := snapshot.getInternalData().getNodeInfo(nodeName)
if err != nil {
return nil, err
}
return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil
}

// ListNodeInfos lists NodeInfos.
func (snapshot *BasicSnapshotStore) ListNodeInfos() ([]*framework.NodeInfo, error) {
schedNodeInfos := snapshot.getInternalData().listNodeInfos()
var result []*framework.NodeInfo
for _, schedNodeInfo := range schedNodeInfos {
result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil))
}
return result, nil
}

// AddNodeInfo adds a NodeInfo.
func (snapshot *BasicSnapshotStore) AddNodeInfo(nodeInfo *framework.NodeInfo) error {
// AddSchedulerNodeInfo adds a NodeInfo.
func (snapshot *BasicSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
if err := snapshot.getInternalData().addNode(nodeInfo.Node()); err != nil {
return err
}
for _, podInfo := range nodeInfo.Pods() {
for _, podInfo := range nodeInfo.Pods {
if err := snapshot.getInternalData().addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
return err
}
Expand Down Expand Up @@ -267,8 +247,8 @@ func (snapshot *BasicSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul
return nil
}

// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicSnapshotStore) RemoveNodeInfo(nodeName string) error {
// RemoveSchedulerNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicSnapshotStore) RemoveSchedulerNodeInfo(nodeName string) error {
return snapshot.getInternalData().removeNodeInfo(nodeName)
}

Expand Down
30 changes: 5 additions & 25 deletions cluster-autoscaler/simulator/clustersnapshot/store/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
Expand Down Expand Up @@ -416,31 +415,12 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot {
return drasnapshot.Snapshot{}
}

// GetNodeInfo gets a NodeInfo.
func (snapshot *DeltaSnapshotStore) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := snapshot.getNodeInfo(nodeName)
if err != nil {
return nil, err
}
return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil
}

// ListNodeInfos lists NodeInfos.
func (snapshot *DeltaSnapshotStore) ListNodeInfos() ([]*framework.NodeInfo, error) {
schedNodeInfos := snapshot.data.getNodeInfoList()
var result []*framework.NodeInfo
for _, schedNodeInfo := range schedNodeInfos {
result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil))
}
return result, nil
}

// AddNodeInfo adds a NodeInfo.
func (snapshot *DeltaSnapshotStore) AddNodeInfo(nodeInfo *framework.NodeInfo) error {
// AddSchedulerNodeInfo adds a NodeInfo.
func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
if err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
return err
}
for _, podInfo := range nodeInfo.Pods() {
for _, podInfo := range nodeInfo.Pods {
if err := snapshot.data.addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
return err
}
Expand Down Expand Up @@ -470,8 +450,8 @@ func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul
return nil
}

// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaSnapshotStore) RemoveNodeInfo(nodeName string) error {
// RemoveSchedulerNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaSnapshotStore) RemoveSchedulerNodeInfo(nodeName string) error {
return snapshot.data.removeNodeInfo(nodeName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

func BenchmarkBuildNodeInfoList(b *testing.B) {
Expand Down Expand Up @@ -54,7 +54,9 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
}
deltaStore.Fork()
for _, node := range nodes[tc.nodeCount:] {
if err := deltaStore.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil {
schedNodeInfo := schedulerframework.NewNodeInfo()
schedNodeInfo.SetNode(node)
if err := deltaStore.AddSchedulerNodeInfo(schedNodeInfo); err != nil {
assert.NoError(b, err)
}
}
Expand Down

0 comments on commit 174dc11

Please sign in to comment.