Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
towca committed Sep 11, 2024
1 parent 48be6fd commit 521e499
Show file tree
Hide file tree
Showing 32 changed files with 2,902 additions and 2,336 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
opts.DrainabilityRules = rules.Default(opts.DeleteOptions)
}
if opts.DraProvider == nil {
opts.DraProvider = dynamicresources.NewProvider(informerFactory)
opts.DraProvider = dynamicresources.NewProviderFromInformers(informerFactory)
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking(uns
return corev1helpers.PodPriority(unschedulableCandidates[i].Pod) > corev1helpers.PodPriority(unschedulableCandidates[j].Pod)
})

// TODO(DRA): Stop casting to naked Pods after ScaleUp works on PodResourceInfos.
statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, clustersnapshot.ToPods(unschedulableCandidates), p.nodeFilter, false)
statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, unschedulableCandidates, p.nodeFilter, false)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ func (p *Planner) injectPods(pods []*apiv1.Pod) error {
pods = pod_util.ClearPodNodeNames(pods)
// Note: We're using ScheduleAnywhere, but the pods won't schedule back
// on the drained nodes due to taints.
statuses, _, err := p.actuationInjector.TrySchedulePods(p.context.ClusterSnapshot, pods, scheduling.ScheduleAnywhere, true)
// TODO(DRA): Figure out.
var podRes []*clustersnapshot.PodResourceInfo
for _, pod := range pods {
podRes = append(podRes, &clustersnapshot.PodResourceInfo{Pod: pod})
}
statuses, _, err := p.actuationInjector.TrySchedulePods(p.context.ClusterSnapshot, podRes, scheduling.ScheduleAnywhere, true)
if err != nil {
return fmt.Errorf("cannot scale down, an unexpected error occurred: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
var schedulablePodGroups []estimator.PodEquivalenceGroup
for _, eg := range podEquivalenceGroups {
samplePod := eg.Pods[0]
if err := o.autoscalingContext.PredicateChecker.CheckPredicates(o.autoscalingContext.ClusterSnapshot, samplePod.Pod, nodeInfo.Node().Name); err == nil {
if err, _ := o.autoscalingContext.PredicateChecker.CheckPredicates(o.autoscalingContext.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
// Add pods to option.
schedulablePodGroups = append(schedulablePodGroups, estimator.PodEquivalenceGroup{
Pods: eg.Pods,
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError {
a.ClusterSnapshot.Clear()

a.ClusterSnapshot.SetGlobalResourceSlices(a.ClusterSnapshot.DraObjectsSource.NonNodeLocalResourceSlices)
a.ClusterSnapshot.SetAllResourceClaims(a.ClusterSnapshot.DraObjectsSource.AllResourceClaims())
knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := a.ClusterSnapshot.AddNode(clustersnapshot.NewNodeResourceInfo(node, a.ClusterSnapshot.DraObjectsSource)); err != nil {
Expand Down
125 changes: 124 additions & 1 deletion cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
mockprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mocks"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
Expand Down Expand Up @@ -199,6 +200,9 @@ type commonMocks struct {
daemonSetLister *daemonSetListerMock
nodeDeletionTracker *deletiontracker.NodeDeletionTracker

resourceClaimLister *fakeResourceClaimLister
resoureceSliceLister *fakeResourceSliceLister

onScaleUp *onScaleUpMock
onScaleDown *onScaleDownMock
}
Expand Down Expand Up @@ -304,6 +308,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
draProvider: dynamicresources.NewProvider(config.mocks.resourceClaimLister, config.mocks.resoureceSliceLister),
}

return autoscaler, nil
Expand Down Expand Up @@ -867,7 +872,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
onNodeGroupDeleteMock.On("Delete", "autoprovisioned-"+
"TN1").Return(nil).Once()
"TN1").Return(nil).Once()
onScaleDownMock.On("ScaleDown", "autoprovisioned-TN2", "n2").Return(nil).Once()

err = autoscaler.RunOnce(time.Now().Add(2 * time.Hour))
Expand Down Expand Up @@ -2678,3 +2683,121 @@ func newEstimatorBuilder() estimator.EstimatorBuilder {

return estimatorBuilder
}

type fakeResourceClaimLister struct {
claims []*resourceapi.ResourceClaim
}

func (l *fakeResourceClaimLister) List() ([]*resourceapi.ResourceClaim, error) {
if l == nil {
return nil, nil
}
return l.claims, nil
}

type fakeResourceSliceLister struct {
slices []*resourceapi.ResourceSlice
}

func (l *fakeResourceSliceLister) List() ([]*resourceapi.ResourceSlice, error) {
if l == nil {
return nil, nil
}
return l.slices, nil
}

func TestStaticAutoscalerDynamicResources(t *testing.T) {
opts := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: time.Minute,
ScaleDownUtilizationThreshold: 0.5,
MaxNodeProvisionTime: 10 * time.Second,
},
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
MaxNodesTotal: 10,
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
}
now := time.Now()

node1 := BuildTestNode("node1", 1000, 1000)
SetNodeReadyState(node1, true, now)
node2 := BuildTestNode("node2", 1000, 1000)
SetNodeReadyState(node1, true, now)

draNode1 := BuildTestNode("draNode1", 1000, 1000)
SetNodeReadyState(draNode1, true, now)
draNode2 := BuildTestNode("draNodee2", 1000, 1000)
SetNodeReadyState(draNode2, true, now)

ngs := []*nodeGroup{{
name: "regular",
min: 1,
max: 10,
nodes: []*apiv1.Node{node1, node2},
}, {
name: "dra",
min: 1,
max: 10,
nodes: []*apiv1.Node{draNode1, draNode2},
}}

allNodes := []*apiv1.Node{node1, node2, draNode1, draNode2}

pod1 := BuildTestPod("pod1", 600, 100)
pod1.Spec.NodeName = "n1"
pod2 := BuildTestPod("pod2", 100, 100)

draPod1 := BuildTestPod("draPod1", 600, 100, WithResourceClaim(""), WithResourceClaim())
draPod1.Spec.NodeName = "draNode1"
draPod2 := BuildTestPod("draPod2", 100, 100, WithResourceClaim())

allPods := []*apiv1.Pod{pod1, pod2, draPod1, draPod2}

mocks := newCommonMocks()
mocks.readyNodeLister.SetNodes(allNodes)
mocks.allNodeLister.SetNodes(allNodes)
mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil)
mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil)

testSetupConfig := &autoscalerSetupConfig{
autoscalingOptions: opts,
nodeGroups: ngs,
nodeStateUpdateTime: now,
mocks: mocks,
clusterStateConfig: clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
},
}

testCases := map[string]struct {
setupConfig *autoscalerSetupConfig
pods []*apiv1.Pod
expectedScaleUp *scaleCall
}{
"basic DRA scale-up": {
pods: []*apiv1.Pod{p1, p2},
setupConfig: testSetupConfig,
},
}

for tcName, tc := range testCases {
t.Run(tcName, func(t *testing.T) {
autoscaler, err := setupAutoscaler(tc.setupConfig)
assert.NoError(t, err)

tc.setupConfig.mocks.allPodLister.On("List").Return(tc.pods, nil).Twice()

if tc.expectedScaleUp != nil {
tc.setupConfig.mocks.onScaleUp.On("ScaleUp", tc.expectedScaleUp.ng, tc.expectedScaleUp.delta).Return(nil).Once()
}
err = autoscaler.RunOnce(now.Add(time.Hour))
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, tc.setupConfig.mocks.allPodLister,
tc.setupConfig.mocks.podDisruptionBudgetLister, tc.setupConfig.mocks.daemonSetLister, tc.setupConfig.mocks.onScaleUp, tc.setupConfig.mocks.onScaleDown)
})
}

}
31 changes: 31 additions & 0 deletions cluster-autoscaler/dynamicresources/listers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package dynamicresources

import (
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/labels"
resourceapilisters "k8s.io/client-go/listers/resource/v1alpha3"
)

type resourceClaimLister interface {
List() ([]*resourceapi.ResourceClaim, error)
}

type resourceSliceLister interface {
List() (ret []*resourceapi.ResourceSlice, err error)
}

type resourceClaimApiLister struct {
apiLister resourceapilisters.ResourceClaimLister
}

func (l *resourceClaimApiLister) List() ([]*resourceapi.ResourceClaim, error) {
return l.apiLister.List(labels.Everything())
}

type resourceSliceApiLister struct {
apiLister resourceapilisters.ResourceSliceLister
}

func (l *resourceSliceApiLister) List() (ret []*resourceapi.ResourceSlice, err error) {
return l.apiLister.List(labels.Everything())
}
40 changes: 23 additions & 17 deletions cluster-autoscaler/dynamicresources/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ package dynamicresources

import (
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
resourceapilisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/klog/v2"
)

// Provider provides DRA-related objects. Zero-value Provider object provides no objects, it can be used e.g. in tests.
type Provider struct {
resourceClaims resourceapilisters.ResourceClaimLister
resourceSlices resourceapilisters.ResourceSliceLister
resourceClaims resourceClaimLister
resourceSlices resourceSliceLister
}

func NewProvider(informerFactory informers.SharedInformerFactory) *Provider {
func NewProviderFromInformers(informerFactory informers.SharedInformerFactory) *Provider {
claims := &resourceClaimApiLister{apiLister: informerFactory.Resource().V1alpha3().ResourceClaims().Lister()}
slices := &resourceSliceApiLister{apiLister: informerFactory.Resource().V1alpha3().ResourceSlices().Lister()}
return NewProvider(claims, slices)
}

func NewProvider(claims resourceClaimLister, slices resourceSliceLister) *Provider {
return &Provider{
resourceClaims: informerFactory.Resource().V1alpha3().ResourceClaims().Lister(),
resourceSlices: informerFactory.Resource().V1alpha3().ResourceSlices().Lister(),
resourceClaims: claims,
resourceSlices: slices,
}
}

Expand All @@ -27,30 +30,33 @@ func (p *Provider) Snapshot() (Snapshot, error) {
return Snapshot{}, nil
}

claims, err := p.resourceClaims.List(labels.Everything())
claims, err := p.resourceClaims.List()
if err != nil {
return Snapshot{}, err
}
claimMap := make(map[objectRef]*resourceapi.ResourceClaim)
claimMap := make(map[ResourceClaimRef]*resourceapi.ResourceClaim)
for _, claim := range claims {
claimMap[objectRef{name: claim.Name, namespace: claim.Namespace}] = claim
claimMap[ResourceClaimRef{Name: claim.Name, Namespace: claim.Namespace}] = claim
}

slices, err := p.resourceSlices.List(labels.Everything())
slices, err := p.resourceSlices.List()

if err != nil {
return Snapshot{}, err
}
slicesMap := make(map[string][]*resourceapi.ResourceSlice)
var nonNodeLocalSlices []*resourceapi.ResourceSlice
for _, slice := range slices {
if slice.Spec.NodeName == "" {
klog.Warningf("DRA: ignoring non-Node-local ResourceSlice %s/%s", slice.Namespace, slice.Name)
continue
nonNodeLocalSlices = append(nonNodeLocalSlices, slice)
} else {
slicesMap[slice.Spec.NodeName] = append(slicesMap[slice.Spec.NodeName], slice)
}
slicesMap[slice.Spec.NodeName] = append(slicesMap[slice.Spec.NodeName], slice)
}

return Snapshot{
resourceClaimsByRef: claimMap,
resourceSlicesByNodeName: slicesMap,
resourceClaimsByRef: claimMap,
resourceSlicesByNodeName: slicesMap,
NonNodeLocalResourceSlices: nonNodeLocalSlices,
}, nil
}
21 changes: 15 additions & 6 deletions cluster-autoscaler/dynamicresources/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type objectRef struct {
name string
namespace string
type ResourceClaimRef struct {
Name string
Namespace string
}

// Snapshot contains a point-in-time view of all DRA-related objects that CA potentially needs to simulate.
type Snapshot struct {
resourceClaimsByRef map[objectRef]*resourceapi.ResourceClaim
resourceSlicesByNodeName map[string][]*resourceapi.ResourceSlice
resourceClaimsByRef map[ResourceClaimRef]*resourceapi.ResourceClaim
resourceSlicesByNodeName map[string][]*resourceapi.ResourceSlice
NonNodeLocalResourceSlices []*resourceapi.ResourceSlice
}

func (s Snapshot) PodResourceRequests(pod *apiv1.Pod) schedulerframework.PodDynamicResourceRequests {
Expand All @@ -41,13 +42,21 @@ func (s Snapshot) NodeResources(node *apiv1.Node) schedulerframework.NodeDynamic
}
}

func (s Snapshot) AllResourceClaims() []*resourceapi.ResourceClaim {
var result []*resourceapi.ResourceClaim
for _, claim := range s.resourceClaimsByRef {
result = append(result, claim)
}
return result
}

func (s Snapshot) claimForPod(pod *apiv1.Pod, claimRef apiv1.PodResourceClaim) (*resourceapi.ResourceClaim, error) {
claimName := claimRefToName(pod, claimRef)
if claimName == "" {
return nil, fmt.Errorf("couldn't determine ResourceClaim name")
}

claim, found := s.resourceClaimsByRef[objectRef{name: claimName, namespace: pod.Namespace}]
claim, found := s.resourceClaimsByRef[ResourceClaimRef{Name: claimName, Namespace: pod.Namespace}]
if !found {
return nil, fmt.Errorf("couldn't find ResourceClaim %q", claimName)
}
Expand Down
Loading

0 comments on commit 521e499

Please sign in to comment.