From dd902108f98ac4b8358adfcc85bc91064f004037 Mon Sep 17 00:00:00 2001 From: cedric lamoriniere Date: Mon, 20 Sep 2021 12:22:43 +0200 Subject: [PATCH] [local] Add NodeInfos PodTemplates process implementation --- .../node_info_with_podtemplates_processors.go | 164 ++++++++++ ..._info_with_podtemplates_processors_test.go | 307 ++++++++++++++++++ 2 files changed, 471 insertions(+) create mode 100644 cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors.go create mode 100644 cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors_test.go diff --git a/cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors.go b/cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors.go new file mode 100644 index 000000000000..e00927f3f38e --- /dev/null +++ b/cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors.go @@ -0,0 +1,164 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtemplates + +import ( + "context" + "fmt" + "time" + + apiv1 "k8s.io/api/core/v1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + + client "k8s.io/client-go/kubernetes" + v1lister "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + + ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfos" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const ( + // PodTemplateDaemonSetLabelKey use as label key on PodTemplate corresponding to an extra Daemonset. + PodTemplateDaemonSetLabelKey = "cluster-autoscaler.kubernetes.io/daemonset-pod" + // PodTemplateDaemonSetLabelValueTrue use as PodTemplateDaemonSetLabelKey label value. + PodTemplateDaemonSetLabelValueTrue = "true" +) + +// NewNodeInfoWithPodTemplateProcessor returns a default instance of NodeInfoProcessor. +func NewNodeInfoWithPodTemplateProcessor(opts *core.AutoscalerOptions) nodeinfos.NodeInfoProcessor { + internalContext, cancelFunc := context.WithCancel(context.Background()) + + return &nodeInfoWithPodTemplateProcessor{ + ctx: internalContext, + cancelFunc: cancelFunc, + podTemplateLister: newPodTemplateLister(opts.KubeClient, internalContext.Done()), + } +} + +// nodeInfoWithPodTemplateProcessor add possible PodTemplates in nodeInfos. +type nodeInfoWithPodTemplateProcessor struct { + podTemplateLister v1lister.PodTemplateLister + + ctx context.Context + cancelFunc func() +} + +// Process returns unchanged nodeInfos. +func (p *nodeInfoWithPodTemplateProcessor) Process(ctx *ca_context.AutoscalingContext, nodeInfosForNodeGroups map[string]*schedulerframework.NodeInfo) (map[string]*schedulerframework.NodeInfo, error) { + // here we can use empty snapshot, since the NodeInfos that will be updated + // are from CloudProvider NodeTemplates. + clusterSnapshot := simulator.NewBasicClusterSnapshot() + + // retrieve only once the podTemplates list. + // This list will be used for each NodeGroup. + podTemplates, err := p.podTemplateLister.List(labels.Everything()) + if err != nil { + return nil, errors.ToAutoscalerError(errors.InternalError, err) + } + + result := make(map[string]*schedulerframework.NodeInfo, len(nodeInfosForNodeGroups)) + var errs []error + for id, nodeInfo := range nodeInfosForNodeGroups { + var newNodeInfo *schedulerframework.NodeInfo + var err error + + // At Datadog we only receive nodeInfos from Cloud provider templates + // It means that all nodeInfo needs to decorated with podTemplates + newNodeInfo, err = getNodeInfoWithPodTemplates(nodeInfo, podTemplates, clusterSnapshot, ctx.PredicateChecker) + if err != nil { + errs = append(errs, err) + } + + result[id] = newNodeInfo + } + + return result, utilerrors.NewAggregate(errs) +} + +// CleanUp cleans up processor's internal structuxres. +func (p *nodeInfoWithPodTemplateProcessor) CleanUp() { + p.cancelFunc() +} + +func newPodTemplateLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.PodTemplateLister { + podTemplateWatchOption := func(options *metav1.ListOptions) { + options.FieldSelector = fields.Everything().String() + options.LabelSelector = labels.SelectorFromSet(getDaemonsetPodTemplateLabelSet()).String() + } + listWatcher := cache.NewFilteredListWatchFromClient(kubeClient.CoreV1().RESTClient(), "podtemplates", apiv1.NamespaceAll, podTemplateWatchOption) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.PodTemplate{}, time.Hour) + lister := v1lister.NewPodTemplateLister(store) + go reflector.Run(stopchannel) + return lister +} + +const nodeInfoDeepCopySuffix = "podtemplate" + +func getNodeInfoWithPodTemplates(baseNodeInfo *schedulerframework.NodeInfo, podTemplates []*apiv1.PodTemplate, clusterSnapshot *simulator.BasicClusterSnapshot, predicateChecker simulator.PredicateChecker) (*schedulerframework.NodeInfo, error) { + // clone baseNodeInfo to not modify the input object. + newNodeInfo := scheduler_utils.DeepCopyTemplateNode(baseNodeInfo, nodeInfoDeepCopySuffix) + node := newNodeInfo.Node() + var pods []*apiv1.Pod + + for _, podInfo := range baseNodeInfo.Pods { + pods = append(pods, podInfo.Pod) + } + + if err := clusterSnapshot.AddNodeWithPods(node, pods); err != nil { + return nil, err + } + + for _, podTpl := range podTemplates { + newPod := newPod(podTpl, node.Name) + err := predicateChecker.CheckPredicates(clusterSnapshot, newPod, node.Name) + if err == nil { + newNodeInfo.AddPod(newPod) + } else if err.ErrorType() == simulator.NotSchedulablePredicateError { + // ok; we are just skipping this daemonset + } else { + // unexpected error + return nil, fmt.Errorf("unexpected error while calling PredicateChecker; %v", err) + } + } + + return newNodeInfo, nil +} + +func getDaemonsetPodTemplateLabelSet() labels.Set { + daemonsetPodTemplateLabels := map[string]string{ + PodTemplateDaemonSetLabelKey: PodTemplateDaemonSetLabelValueTrue, + } + return labels.Set(daemonsetPodTemplateLabels) +} + +func newPod(pt *apiv1.PodTemplate, nodeName string) *apiv1.Pod { + newPod := &apiv1.Pod{Spec: pt.Template.Spec, ObjectMeta: pt.Template.ObjectMeta} + newPod.Namespace = pt.Namespace + newPod.Name = fmt.Sprintf("%s-pod", pt.Name) + newPod.Spec.NodeName = nodeName + return newPod +} diff --git a/cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors_test.go b/cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors_test.go new file mode 100644 index 000000000000..99a69f9eeb1a --- /dev/null +++ b/cluster-autoscaler/processors/datadog/nodeinfos/podtemplates/node_info_with_podtemplates_processors_test.go @@ -0,0 +1,307 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtemplates + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + apiv1 "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + + v1lister "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + + ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" +) + +func Test_getNodeInfoWithPodTemplates(t *testing.T) { + nodeName1 := "template-node-template-for-node-1" + nodePod1 := newTestPod("bar", "foo", &apiv1.PodSpec{}, nodeName1) + nodeInfo := newNodeInfo(nodeName1, 42, nodePod1) + nodeInfoUnschedulable := newNodeInfo(nodeName1, 0, nodePod1) + + type args struct { + baseNodeInfo *schedulerframework.NodeInfo + podTemplates []*apiv1.PodTemplate + } + tests := []struct { + name string + args args + wantFunc func() *schedulerframework.NodeInfo + wantErr bool + }{ + { + name: "nodeInfo should not be updated", + args: args{ + baseNodeInfo: nodeInfo.Clone(), + podTemplates: nil, + }, + wantFunc: func() *schedulerframework.NodeInfo { + return scheduler_utils.DeepCopyTemplateNode(nodeInfo, nodeInfoDeepCopySuffix) + }, + }, + { + name: "nodeInfo contains one additional Pod", + args: args{ + baseNodeInfo: nodeInfo.Clone(), + podTemplates: []*apiv1.PodTemplate{ + newPodTemplate("extra-ns", "extra-name", nil), + }, + }, + wantFunc: func() *schedulerframework.NodeInfo { + nodeInfo := scheduler_utils.DeepCopyTemplateNode(nodeInfo, nodeInfoDeepCopySuffix) + nodeInfo.AddPod(newTestPod("extra-ns", "extra-name-pod", nil, nodeName1+"-podtemplate")) + return nodeInfo + }, + }, + { + name: "nodeInfo unschedulable", + args: args{ + baseNodeInfo: nodeInfoUnschedulable.Clone(), + podTemplates: []*apiv1.PodTemplate{ + newPodTemplate("extra-ns", "extra-name", nil), + }, + }, + wantFunc: func() *schedulerframework.NodeInfo { + return scheduler_utils.DeepCopyTemplateNode(nodeInfoUnschedulable, nodeInfoDeepCopySuffix) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clusterSnapshot := simulator.NewBasicClusterSnapshot() + predicateChecker, _ := simulator.NewTestPredicateChecker() + + got, err := getNodeInfoWithPodTemplates(tt.args.baseNodeInfo, tt.args.podTemplates, clusterSnapshot, predicateChecker) + if (err != nil) != tt.wantErr { + t.Errorf("getNodeInfoWithPodTemplates() error = %v, wantErr %v", err, tt.wantErr) + return + } + want := tt.wantFunc() + + got.Generation = want.Generation + + resetNodeInfoGeneratedFields(got) + resetNodeInfoGeneratedFields(want) + assert.EqualValues(t, want, got, "getNodeInfoWithPodTemplates wrong expected value") + }) + } +} + +func Test_nodeInfoWithPodTemplateProcessor_Process(t *testing.T) { + namespace := "bar" + podName := "foo-dfsdfds" + nodeName1 := "template-node-for-template-node-1" + wantNodeName1 := fmt.Sprintf("%s-%s", nodeName1, nodeInfoDeepCopySuffix) + + tests := []struct { + name string + podTemplates []*apiv1.PodTemplate + podListerCreation func(pts []*apiv1.PodTemplate) (v1lister.PodTemplateLister, error) + nodeInfosForNodeGroups map[string]*schedulerframework.NodeInfo + want map[string]*schedulerframework.NodeInfo + wantErr bool + }{ + { + name: "1 pod added", + podTemplates: []*apiv1.PodTemplate{newPodTemplate(namespace, podName, nil)}, + podListerCreation: newTestDaemonSetLister, + nodeInfosForNodeGroups: map[string]*schedulerframework.NodeInfo{ + nodeName1: newNodeInfo(nodeName1, 42), + }, + want: map[string]*schedulerframework.NodeInfo{ + nodeName1: newNodeInfo(wantNodeName1, 42, newTestPod(namespace, fmt.Sprintf("%s-pod", podName), nil, wantNodeName1)), + }, + wantErr: false, + }, + { + name: "0 pod added: unschedulable node", + podTemplates: []*apiv1.PodTemplate{newPodTemplate(namespace, podName, nil)}, + podListerCreation: newTestDaemonSetLister, + nodeInfosForNodeGroups: map[string]*schedulerframework.NodeInfo{ + nodeName1: newNodeInfo(nodeName1, 0), + }, + want: map[string]*schedulerframework.NodeInfo{ + nodeName1: newNodeInfo(wantNodeName1, 0), + }, + wantErr: false, + }, + { + name: "pod lister error", + podTemplates: []*apiv1.PodTemplate{newPodTemplate(namespace, podName, nil)}, + podListerCreation: func(pts []*apiv1.PodTemplate) (v1lister.PodTemplateLister, error) { + podLister := &podTemplateListerMock{} + podLister.On("List").Return(pts, fmt.Errorf("unable to list")) + return podLister, nil + }, + nodeInfosForNodeGroups: map[string]*schedulerframework.NodeInfo{ + nodeName1: newNodeInfo(nodeName1, 0), + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + podLister, err := tt.podListerCreation(tt.podTemplates) + assert.NoError(t, err, "err should be nil") + + ctx, cancelFunc := context.WithCancel(context.Background()) + + p := &nodeInfoWithPodTemplateProcessor{ + podTemplateLister: podLister, + ctx: ctx, + cancelFunc: cancelFunc, + } + + caCtx, err := newTestClusterAutoscalerContext() + assert.NoError(t, err, "err should be nil") + + got, err := p.Process(caCtx, tt.nodeInfosForNodeGroups) + + resetNodeInfosGeneratedFields(got) + resetNodeInfosGeneratedFields(tt.want) + + if (err != nil) != tt.wantErr { + t.Errorf("nodeInfoWithPodTemplateProcessor.Process() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.EqualValues(t, tt.want, got, "nodeInfoWithPodTemplateProcessor.Process() wrong expected value") + }) + } +} + +func newTestDaemonSetLister(pts []*apiv1.PodTemplate) (v1lister.PodTemplateLister, error) { + store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, pt := range pts { + err := store.Add(pt) + if err != nil { + return nil, fmt.Errorf("Error adding object to cache: %v", err) + } + } + return v1lister.NewPodTemplateLister(store), nil +} + +func newTestClusterAutoscalerContext() (*ca_context.AutoscalingContext, error) { + predicateChecker, err := simulator.NewTestPredicateChecker() + if err != nil { + return nil, err + } + + ctx := &ca_context.AutoscalingContext{ + PredicateChecker: predicateChecker, + } + return ctx, nil +} + +func newNode(name string, maxPods int64) *apiv1.Node { + // define a resource list to allow pod scheduling on this node. + newResourceList := apiv1.ResourceList{ + apiv1.ResourcePods: *resource.NewQuantity(maxPods, resource.DecimalSI), + } + return &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "kubernetes.io/hostname": name, + }, + }, + Status: apiv1.NodeStatus{ + Allocatable: newResourceList, + }, + } +} + +func newNodeInfo(nodeName string, maxPod int64, pods ...*apiv1.Pod) *schedulerframework.NodeInfo { + node1 := newNode(nodeName, maxPod) + nodeInfo := schedulerframework.NewNodeInfo(pods...) + nodeInfo.SetNode(node1) + + return nodeInfo +} + +func newPodTemplate(namespace, name string, spec *apiv1.PodTemplateSpec) *apiv1.PodTemplate { + pt := &apiv1.PodTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + if spec != nil { + pt.Template = *spec + } + + return pt +} + +func newTestPod(namespace, name string, spec *apiv1.PodSpec, nodeName string) *apiv1.Pod { + newPod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + newPod.Namespace = namespace + newPod.Name = name + if spec != nil { + newPod.Spec = *spec + } + newPod.Spec.NodeName = nodeName + return newPod +} + +func resetNodeInfosGeneratedFields(nodeInfos map[string]*schedulerframework.NodeInfo) { + for _, nodeInfo := range nodeInfos { + resetNodeInfoGeneratedFields(nodeInfo) + } +} + +func resetNodeInfoGeneratedFields(nodeInfo *schedulerframework.NodeInfo) { + nodeInfo.Generation = 0 + nodeInfo.Node().UID = "" + for _, podInfo := range nodeInfo.Pods { + podInfo.Pod.UID = "" + } +} + +type podTemplateListerMock struct { + mock.Mock +} + +// List lists all PodTemplates in the indexer. +func (p *podTemplateListerMock) List(selector labels.Selector) (ret []*apiv1.PodTemplate, err error) { + args := p.Called() + return args.Get(0).([]*apiv1.PodTemplate), args.Error(1) +} + +// PodTemplates returns an object that can list and get PodTemplates. +func (p *podTemplateListerMock) PodTemplates(namespace string) v1lister.PodTemplateNamespaceLister { + args := p.Called(namespace) + return args.Get(0).(v1lister.PodTemplateNamespaceLister) +}