Skip to content

Commit

Permalink
TMP
Browse files Browse the repository at this point in the history
  • Loading branch information
towca committed Nov 20, 2024
1 parent b68dcfd commit e6b767a
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig)
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled)
if err != nil {
return err
}
Expand Down
19 changes: 19 additions & 0 deletions cluster-autoscaler/dynamicresources/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type resourceClaimRef struct {
Expand All @@ -38,6 +39,24 @@ type Snapshot struct {
deviceClasses map[string]*resourceapi.DeviceClass
}

// ResourceClaims exposes the Snapshot as schedulerframework.ResourceClaimTracker, in order to interact with
// the scheduler framework.
func (s Snapshot) ResourceClaims() schedulerframework.ResourceClaimTracker {
return snapshotClaimTracker(s)
}

// ResourceSlices exposes the Snapshot as schedulerframework.ResourceSliceLister, in order to interact with
// the scheduler framework.
func (s Snapshot) ResourceSlices() schedulerframework.ResourceSliceLister {
return snapshotSliceLister(s)
}

// DeviceClasses exposes the Snapshot as schedulerframework.DeviceClassLister, in order to interact with
// the scheduler framework.
func (s Snapshot) DeviceClasses() schedulerframework.DeviceClassLister {
return snapshotClassLister(s)
}

// Clone returns a copy of this Snapshot that can be independently modified without affecting this Snapshot.
// The only mutable objects in the Snapshot are ResourceClaims, so they are deep-copied. The rest is only a
// shallow copy.
Expand Down
89 changes: 89 additions & 0 deletions cluster-autoscaler/dynamicresources/snapshot_claim_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources

import (
"fmt"

resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/types"
)

type snapshotClaimTracker Snapshot

func (s snapshotClaimTracker) List() ([]*resourceapi.ResourceClaim, error) {
var result []*resourceapi.ResourceClaim
for _, claim := range s.resourceClaimsByRef {
result = append(result, claim)
}
return result, nil
}

func (s snapshotClaimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) {
claim, found := s.resourceClaimsByRef[resourceClaimRef{Name: claimName, Namespace: namespace}]
if !found {
return nil, fmt.Errorf("claim %s/%s not found", namespace, claimName)
}
return claim, nil
}

func (s snapshotClaimTracker) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
var result []*resourceapi.ResourceClaim
for _, claim := range s.resourceClaimsByRef {
if ClaimAllocated(claim) {
result = append(result, claim)
}
}
return result, nil
}

func (s snapshotClaimTracker) SignalClaimPendingAllocation(claimUid types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
ref := resourceClaimRef{Name: allocatedClaim.Name, Namespace: allocatedClaim.Namespace}
claim, found := s.resourceClaimsByRef[ref]
if !found {
return fmt.Errorf("claim %s/%s not found", allocatedClaim.Namespace, allocatedClaim.Name)
}
if claim.UID != claimUid {
return fmt.Errorf("claim %s/%s: snapshot has UID %q, allocation came for UID %q - shouldn't happenn", allocatedClaim.Namespace, allocatedClaim.Name, claim.UID, claimUid)
}
s.resourceClaimsByRef[ref] = allocatedClaim
return nil
}

func (s snapshotClaimTracker) ClaimHasPendingAllocation(claimUid types.UID) bool {
return false
}

func (s snapshotClaimTracker) RemoveClaimPendingAllocation(claimUid types.UID) (deleted bool) {
//TODO implement me
panic("implement me")
}

func (s snapshotClaimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {
//TODO implement me
panic("implement me")
}

func (s snapshotClaimTracker) AssumedClaimRestore(namespace, claimName string) {
//TODO implement me
panic("implement me")
}

func (s snapshotClaimTracker) GetOriginal(namespace, claimName string) (*resourceapi.ResourceClaim, error) {
//TODO implement me
panic("implement me")
}
41 changes: 41 additions & 0 deletions cluster-autoscaler/dynamicresources/snapshot_class_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources

import (
"fmt"

resourceapi "k8s.io/api/resource/v1alpha3"
)

type snapshotClassLister Snapshot

func (s snapshotClassLister) List() ([]*resourceapi.DeviceClass, error) {
var result []*resourceapi.DeviceClass
for _, class := range s.deviceClasses {
result = append(result, class)
}
return result, nil
}

func (s snapshotClassLister) Get(className string) (*resourceapi.DeviceClass, error) {
class, found := s.deviceClasses[className]
if !found {
return nil, fmt.Errorf("DeviceClass %q not found", className)
}
return class, nil
}
32 changes: 32 additions & 0 deletions cluster-autoscaler/dynamicresources/snapshot_slice_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources

import resourceapi "k8s.io/api/resource/v1alpha3"

type snapshotSliceLister Snapshot

func (s snapshotSliceLister) List() ([]*resourceapi.ResourceSlice, error) {
var result []*resourceapi.ResourceSlice
for _, slices := range s.resourceSlicesByNodeName {
for _, slice := range slices {
result = append(result, slice)
}
}
result = append(result, s.nonNodeLocalResourceSlices...)
return result, nil
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTransform(trim))

fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig)
fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled)
if err != nil {
return nil, nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// ClusterSnapshot is abstraction of cluster state used for predicate simulations.
Expand Down Expand Up @@ -55,7 +54,7 @@ type ClusterSnapshot interface {
// SnapshotBase is the "low-level" part of ClusterSnapshot. Mutation methods modify the snapshot state directly, without going
// through scheduler predicates.
type SnapshotBase interface {
schedulerframework.SharedLister
framework.SharedLister

// SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/stretchr/testify/assert"

"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -320,7 +321,7 @@ func newTestPluginRunner(snapshotBase clustersnapshot.SnapshotBase, schedConfig
schedConfig = defaultConfig
}

fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig)
fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ func NewPredicateSnapshot(snapshotBase clustersnapshot.SnapshotBase, fwHandle *f
}
}

func (s *PredicateSnapshot) ResourceClaims() schedulerframework.ResourceClaimTracker {
return nil
}

func (s *PredicateSnapshot) ResourceSlices() schedulerframework.ResourceSliceLister {
return nil
}

func (s *PredicateSnapshot) DeviceClasses() schedulerframework.DeviceClassLister {
return nil
}

// SchedulePod adds pod to the snapshot and schedules it to given node.
func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
Expand Down
81 changes: 79 additions & 2 deletions cluster-autoscaler/simulator/framework/delegating_shared_lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ package framework
import (
"fmt"

resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/types"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type SharedLister interface {
schedulerframework.SharedLister
schedulerframework.SharedDraManager
}

// DelegatingSchedulerSharedLister is an implementation of scheduler.SharedLister which
// passes logic to delegate. Delegate can be updated.
type DelegatingSchedulerSharedLister struct {
delegate schedulerframework.SharedLister
delegate SharedLister
}

// NewDelegatingSchedulerSharedLister creates new NewDelegatingSchedulerSharedLister
Expand All @@ -45,8 +52,20 @@ func (lister *DelegatingSchedulerSharedLister) StorageInfos() schedulerframework
return lister.delegate.StorageInfos()
}

func (lister *DelegatingSchedulerSharedLister) ResourceClaims() schedulerframework.ResourceClaimTracker {
return lister.delegate.ResourceClaims()
}

func (lister *DelegatingSchedulerSharedLister) ResourceSlices() schedulerframework.ResourceSliceLister {
return lister.delegate.ResourceSlices()
}

func (lister *DelegatingSchedulerSharedLister) DeviceClasses() schedulerframework.DeviceClassLister {
return lister.delegate.DeviceClasses()
}

// UpdateDelegate updates the delegate
func (lister *DelegatingSchedulerSharedLister) UpdateDelegate(delegate schedulerframework.SharedLister) {
func (lister *DelegatingSchedulerSharedLister) UpdateDelegate(delegate SharedLister) {
lister.delegate = delegate
}

Expand All @@ -58,6 +77,9 @@ func (lister *DelegatingSchedulerSharedLister) ResetDelegate() {
type unsetSharedLister struct{}
type unsetNodeInfoLister unsetSharedLister
type unsetStorageInfoLister unsetSharedLister
type unsetDeviceClassLister unsetSharedLister
type unsetResourceSliceLister unsetSharedLister
type unsetResourceClaimsTracker unsetSharedLister

// List always returns an error
func (lister *unsetNodeInfoLister) List() ([]*schedulerframework.NodeInfo, error) {
Expand All @@ -82,6 +104,48 @@ func (lister *unsetNodeInfoLister) Get(nodeName string) (*schedulerframework.Nod
func (lister *unsetStorageInfoLister) IsPVCUsedByPods(key string) bool {
return false
}
func (u unsetDeviceClassLister) List() ([]*resourceapi.DeviceClass, error) {
return nil, fmt.Errorf("lister not set in delegate")
}

func (u unsetDeviceClassLister) Get(className string) (*resourceapi.DeviceClass, error) {
return nil, fmt.Errorf("lister not set in delegate")
}

func (u unsetResourceSliceLister) List() ([]*resourceapi.ResourceSlice, error) {
return nil, fmt.Errorf("lister not set in delegate")
}

func (u unsetResourceClaimsTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) {
return nil, fmt.Errorf("lister not set in delegate")
}

func (u unsetResourceClaimsTracker) List() ([]*resourceapi.ResourceClaim, error) {
return nil, fmt.Errorf("lister not set in delegate")
}

func (u unsetResourceClaimsTracker) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
return nil, fmt.Errorf("lister not set in delegate")
}

func (u unsetResourceClaimsTracker) SignalClaimPendingAllocation(claimUid types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
return fmt.Errorf("lister not set in delegate")
}

func (u unsetResourceClaimsTracker) ClaimHasPendingAllocation(claimUid types.UID) bool {
return false
}

func (u unsetResourceClaimsTracker) RemoveClaimPendingAllocation(claimUid types.UID) (deleted bool) {
return false
}

func (u unsetResourceClaimsTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {
return fmt.Errorf("lister not set in delegate")
}

func (u unsetResourceClaimsTracker) AssumedClaimRestore(namespace, claimName string) {
}

// NodeInfos: Pods returns a fake NodeInfoLister which always returns an error
func (lister *unsetSharedLister) NodeInfos() schedulerframework.NodeInfoLister {
Expand All @@ -93,4 +157,17 @@ func (lister *unsetSharedLister) StorageInfos() schedulerframework.StorageInfoLi
return (*unsetStorageInfoLister)(lister)
}

func (lister *unsetSharedLister) ResourceClaims() schedulerframework.ResourceClaimTracker {
return (*unsetResourceClaimsTracker)(lister)
}

func (lister *unsetSharedLister) ResourceSlices() schedulerframework.ResourceSliceLister {
return (*unsetResourceSliceLister)(lister)
}

func (lister *unsetSharedLister) DeviceClasses() schedulerframework.DeviceClassLister {
return (*unsetDeviceClassLister)(lister)

}

var unsetSharedListerSingleton *unsetSharedLister
Loading

0 comments on commit e6b767a

Please sign in to comment.