Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scheduler initialization in predicates.go #1794

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 75 additions & 24 deletions cluster-autoscaler/simulator/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
"strings"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
informers "k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"

Expand Down Expand Up @@ -64,36 +68,79 @@ func init() {
algorithmprovider.ApplyFeatureGates()
}

// NoOpEventRecorder is a noop implementation of EventRecorder
type NoOpEventRecorder struct{}

// Event is a noop method implementation
func (NoOpEventRecorder) Event(object runtime.Object, eventtype, reason, message string) {
}

// Eventf is a noop method implementation
func (NoOpEventRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
}

// PastEventf is a noop method implementation
func (NoOpEventRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}

// AnnotatedEventf is a noop method implementation
func (NoOpEventRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
}

// NewPredicateChecker builds PredicateChecker.
func NewPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*PredicateChecker, error) {
provider, err := factory.GetAlgorithmProvider(factory.DefaultProvider)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)

schedulerConfigFactory := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: "cluster-autoscaler",
algorithmProvider := factory.DefaultProvider

// Set up the configurator which can create schedulers from configs.
nodeInformer := informerFactory.Core().V1().Nodes()
podInformer := informerFactory.Core().V1().Pods()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
replicationControllerInformer := informerFactory.Core().V1().ReplicationControllers()
replicaSetInformer := informerFactory.Apps().V1().ReplicaSets()
statefulSetInformer := informerFactory.Apps().V1().StatefulSets()
serviceInformer := informerFactory.Core().V1().Services()
pdbInformer := informerFactory.Policy().V1beta1().PodDisruptionBudgets()
storageClassInformer := informerFactory.Storage().V1().StorageClasses()
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: apiv1.DefaultSchedulerName,
Client: kubeClient,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: apiv1.DefaultHardPodAffinitySymmetricWeight,
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: scheduler.BindTimeoutSeconds,
})
informerFactory.Start(stop)

metadataProducer, err := schedulerConfigFactory.GetPredicateMetadataProducer()
// Create the config from a named algorithm provider.
config, err := configurator.CreateFromProvider(algorithmProvider)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", algorithmProvider, err)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = NoOpEventRecorder{}
config.DisablePreemption = false
config.StopEverything = stop

// Create the scheduler.
sched := scheduler.NewFromConfig(config)

scheduler.AddAllEventHandlers(sched, apiv1.DefaultSchedulerName,
nodeInformer, podInformer, pvInformer, pvcInformer, replicationControllerInformer, replicaSetInformer, statefulSetInformer, serviceInformer, pdbInformer, storageClassInformer)

predicateMap := map[string]predicates.FitPredicate{}
for predicateName, predicateFunc := range sched.Config().Algorithm.Predicates() {
predicateMap[predicateName] = predicateFunc
}
predicateMap, err := schedulerConfigFactory.GetPredicates(provider.FitPredicateKeys)
predicateMap["ready"] = isNodeReadyAndSchedulablePredicate
if err != nil {
return nil, err
Expand Down Expand Up @@ -121,8 +168,12 @@ func NewPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{})
klog.V(1).Infof("Using predicate %s", predInfo.name)
}

// TODO: Verify that run is not needed anymore.
// schedulerConfigFactory.Run()
informerFactory.Start(stop)

metadataProducer, err := configurator.GetPredicateMetadataProducer()
if err != nil {
return nil, fmt.Errorf("could not obtain predicateMetadataProducer; %v", err.Error())
}

return &PredicateChecker{
predicates: predicateList,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading