From 98a21c79b165acecb1870d2c64e05c94400f6814 Mon Sep 17 00:00:00 2001 From: Xuecheng Date: Tue, 27 Sep 2022 15:05:55 +0800 Subject: [PATCH] add auto pod upgrade controller for daemoset (#970) * add auto pod upgrade controller for daemoset Signed-off-by: hxcGit --- .../app/controllermanager.go | 2 +- cmd/yurt-controller-manager/app/core.go | 14 + go.mod | 1 + .../daemon_pod_updater_controller.go | 523 ++++++++++++++++++ .../daemon_pod_updater_controller_test.go | 514 +++++++++++++++++ .../kubernetes/controller_utils.go | 511 +++++++++++++++++ .../kubernetes/controller_utils_test.go | 282 ++++++++++ .../daemonpodupdater/kubernetes/pod_util.go | 83 +++ .../kubernetes/pod_util_test.go | 80 +++ pkg/controller/daemonpodupdater/util.go | 248 +++++++++ pkg/controller/daemonpodupdater/util_test.go | 230 ++++++++ 11 files changed, 2487 insertions(+), 1 deletion(-) create mode 100644 pkg/controller/daemonpodupdater/daemon_pod_updater_controller.go create mode 100644 pkg/controller/daemonpodupdater/daemon_pod_updater_controller_test.go create mode 100644 pkg/controller/daemonpodupdater/kubernetes/controller_utils.go create mode 100644 pkg/controller/daemonpodupdater/kubernetes/controller_utils_test.go create mode 100644 pkg/controller/daemonpodupdater/kubernetes/pod_util.go create mode 100644 pkg/controller/daemonpodupdater/kubernetes/pod_util_test.go create mode 100644 pkg/controller/daemonpodupdater/util.go create mode 100644 pkg/controller/daemonpodupdater/util_test.go diff --git a/cmd/yurt-controller-manager/app/controllermanager.go b/cmd/yurt-controller-manager/app/controllermanager.go index 0354dc0bf8c..9fea3d8ca9c 100644 --- a/cmd/yurt-controller-manager/app/controllermanager.go +++ b/cmd/yurt-controller-manager/app/controllermanager.go @@ -305,7 +305,7 @@ func NewControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["nodelifecycle"] = startNodeLifecycleController controllers["yurtcsrapprover"] = startYurtCSRApproverController - + controllers["daemonpodupdater"] = startDaemonPodUpdaterController return controllers } diff --git a/cmd/yurt-controller-manager/app/core.go b/cmd/yurt-controller-manager/app/core.go index f84777b4d3c..fce440de10d 100644 --- a/cmd/yurt-controller-manager/app/core.go +++ b/cmd/yurt-controller-manager/app/core.go @@ -26,6 +26,7 @@ import ( "time" "github.com/openyurtio/openyurt/pkg/controller/certificates" + daemonpodupdater "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" lifecyclecontroller "github.com/openyurtio/openyurt/pkg/controller/nodelifecycle" ) @@ -65,3 +66,16 @@ func startYurtCSRApproverController(ctx ControllerContext) (http.Handler, bool, return nil, true, nil } + +func startDaemonPodUpdaterController(ctx ControllerContext) (http.Handler, bool, error) { + daemonPodUpdaterCtrl := daemonpodupdater.NewController( + ctx.ClientBuilder.ClientOrDie("daemonPodUpdater-controller"), + ctx.InformerFactory.Apps().V1().DaemonSets(), + ctx.InformerFactory.Core().V1().Nodes(), + ctx.InformerFactory.Core().V1().Pods(), + ) + + go daemonPodUpdaterCtrl.Run(2, ctx.Stop) + + return nil, true, nil +} diff --git a/go.mod b/go.mod index ef66c33d9bf..fd6966e5938 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Masterminds/semver/v3 v3.1.1 github.com/Microsoft/go-winio v0.4.15 github.com/aliyun/alibaba-cloud-sdk-go v1.61.579 + github.com/davecgh/go-spew v1.1.1 github.com/daviddengcn/go-colortext v1.0.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/emicklei/go-restful v2.12.0+incompatible // indirect diff --git a/pkg/controller/daemonpodupdater/daemon_pod_updater_controller.go b/pkg/controller/daemonpodupdater/daemon_pod_updater_controller.go new file mode 100644 index 00000000000..194c37b7578 --- /dev/null +++ b/pkg/controller/daemonpodupdater/daemon_pod_updater_controller.go @@ -0,0 +1,523 @@ +/* +Copyright 2022 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemonpodupdater + +import ( + "context" + "fmt" + "sync" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + appsinformers "k8s.io/client-go/informers/apps/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + client "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + appslisters "k8s.io/client-go/listers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + k8sutil "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater/kubernetes" +) + +const ( + // UpdateAnnotation is the annotation key used in daemonset spec to indicate + // which update strategy is selected. Currently, "ota" and "auto" are supported. + UpdateAnnotation = "apps.openyurt.io/update-strategy" + + // OTAUpdate set daemonset to over-the-air update mode. + // In daemonPodUpdater controller, we add PodNeedUpgrade condition to pods. + OTAUpdate = "ota" + // AutoUpdate set daemonset to auto update mode. + // In this mode, daemonset will keep updating even if there are not-ready nodes. + // For more details, see https://github.com/openyurtio/openyurt/pull/921. + AutoUpdate = "auto" + + // PodNeedUpgrade indicates whether the pod is able to upgrade. + PodNeedUpgrade corev1.PodConditionType = "PodNeedUpgrade" + + // MaxUnavailableAnnotation is the annotation key added to daemonset to indicate + // the max unavailable pods number. It's used with "apps.openyurt.io/update-strategy=auto". + // If this annotation is not explicitly stated, it will be set to the default value 1. + MaxUnavailableAnnotation = "apps.openyurt.io/max-unavailable" + + // BurstReplicas is a rate limiter for booting pods on a lot of pods. + // The value of 250 is chosen b/c values that are too high can cause registry DoS issues. + BurstReplicas = 250 + + maxRetries = 30 +) + +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = appsv1.SchemeGroupVersion.WithKind("DaemonSet") + +type Controller struct { + kubeclientset client.Interface + podControl k8sutil.PodControlInterface + // daemonPodUpdater watches daemonset, node and pod resource + daemonsetLister appslisters.DaemonSetLister + daemonsetSynced cache.InformerSynced + nodeLister corelisters.NodeLister + nodeSynced cache.InformerSynced + podLister corelisters.PodLister + podSynced cache.InformerSynced + daemonsetWorkqueue workqueue.RateLimitingInterface + expectations k8sutil.ControllerExpectationsInterface +} + +func NewController(kc client.Interface, daemonsetInformer appsinformers.DaemonSetInformer, + nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer) *Controller { + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(0) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kc.CoreV1().Events("")}) + + ctrl := Controller{ + kubeclientset: kc, + // Use PodControlInterface to delete pods, which is convenient for testing + podControl: k8sutil.RealPodControl{ + KubeClient: kc, + Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "daemonPodUpdater"}), + }, + + daemonsetLister: daemonsetInformer.Lister(), + daemonsetSynced: daemonsetInformer.Informer().HasSynced, + + nodeLister: nodeInformer.Lister(), + nodeSynced: nodeInformer.Informer().HasSynced, + + podLister: podInformer.Lister(), + podSynced: podInformer.Informer().HasSynced, + + daemonsetWorkqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + expectations: k8sutil.NewControllerExpectations(), + } + + // In this controller, we focus three cases + // 1. daemonset specification changes + // 2. node turns from not-ready to ready + // 3. pods were deleted successfully + // In case 2, daemonset.Status.DesiredNumberScheduled will change and, in case 3, daemonset.Status.NumberReady + // will change. Therefore, we focus only on the daemonset Update event, which can cover the above situations. + daemonsetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + newDS := new.(*appsv1.DaemonSet) + oldDS := old.(*appsv1.DaemonSet) + + // Only handle daemonset meets prerequisites + if !checkPrerequisites(newDS) { + return + } + + if newDS.ResourceVersion == oldDS.ResourceVersion { + return + } + + klog.V(5).Infof("Got daemonset udpate event: %v", newDS.Name) + ctrl.enqueueDaemonSet(newDS) + }, + }) + + // Watch for deletion of pods. The reason we watch is that we don't want a daemon set to delete + // more pods until all the effects (expectations) of a daemon set's delete have been observed. + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: ctrl.deletePod, + }) + return &ctrl +} + +func (c *Controller) enqueueDaemonSet(ds *appsv1.DaemonSet) { + key, err := cache.MetaNamespaceKeyFunc(ds) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ds, err)) + return + } + + klog.V(5).Infof("Daemonset %v queued", key) + c.daemonsetWorkqueue.Add(key) +} + +func (c *Controller) deletePod(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + // When a deletion is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new daemonset will not be woken up till the periodic + // resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + pod, ok = tombstone.Obj.(*corev1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) + return + } + } + + klog.V(5).Infof("Daemonset pod %s deleted.", pod.Name) + + controllerRef := metav1.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + ds := c.resolveControllerRef(pod.Namespace, controllerRef) + if ds == nil { + return + } + + // Only care daemonset meets prerequisites + if !checkPrerequisites(ds) { + return + } + dsKey, err := cache.MetaNamespaceKeyFunc(ds) + if err != nil { + utilruntime.HandleError(err) + return + } + + c.expectations.DeletionObserved(dsKey) +} + +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + klog.Info("Starting daemonPodUpdater controller") + defer klog.Info("Shutting down daemonPodUpdater controller") + defer c.daemonsetWorkqueue.ShutDown() + + // Synchronize the cache before starting to process events + if !cache.WaitForCacheSync(stopCh, c.daemonsetSynced, c.nodeSynced, c.podSynced) { + klog.Error("sync daemonPodUpdater controller timeout") + } + + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh +} + +func (c *Controller) runWorker() { + for { + obj, shutdown := c.daemonsetWorkqueue.Get() + if shutdown { + return + } + + if err := c.syncHandler(obj.(string)); err != nil { + if c.daemonsetWorkqueue.NumRequeues(obj) < maxRetries { + klog.Infof("error syncing event %v: %v", obj, err) + c.daemonsetWorkqueue.AddRateLimited(obj) + c.daemonsetWorkqueue.Done(obj) + continue + } + utilruntime.HandleError(err) + } + + c.daemonsetWorkqueue.Forget(obj) + c.daemonsetWorkqueue.Done(obj) + } +} + +func (c *Controller) syncHandler(key string) error { + defer func() { + klog.V(4).Infof("Finish syncing daemonPodUpdater request %q", key) + }() + + klog.V(4).Infof("Start handling daemonPodUpdater request %q", key) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("invalid resource key: %s", key) + } + + // Daemonset that need to be synced + ds, err := c.daemonsetLister.DaemonSets(namespace).Get(name) + if err != nil { + if apierrors.IsNotFound(err) { + c.expectations.DeleteExpectations(key) + return nil + } + return err + } + + if ds.DeletionTimestamp != nil { + return nil + } + + // Only process daemonset that meets expectations + // Otherwise, wait native daemonset controller reconciling + if !c.expectations.SatisfiedExpectations(key) { + return nil + } + + // Recheck required annotation + v, ok := ds.Annotations[UpdateAnnotation] + if !ok { + return fmt.Errorf("won't sync daemonset %q without annotation 'apps.openyurt.io/update-strategy'", ds.Name) + } + + switch v { + case OTAUpdate: + if err := c.otaUpdate(ds); err != nil { + return err + } + + case AutoUpdate: + if err := c.autoUpdate(ds); err != nil { + return err + } + default: + return fmt.Errorf("unknown annotation type %v", v) + } + + return nil +} + +// otaUpdate compare every pod to its owner daemonset to check if pod is updatable +// If pod is in line with the latest daemonset spec, set pod condition "PodNeedUpgrade" to "false" +// while not, set pod condition "PodNeedUpgrade" to "true" +func (c *Controller) otaUpdate(ds *appsv1.DaemonSet) error { + pods, err := GetDaemonsetPods(c.podLister, ds) + if err != nil { + return err + } + + for _, pod := range pods { + if err := SetPodUpgradeCondition(c.kubeclientset, ds, pod); err != nil { + return err + } + } + return nil +} + +// autoUpdate identifies the set of old pods to delete within the constraints imposed by the max-unavailable number. +// Just ignore and do not calculate not-ready nodes. +func (c *Controller) autoUpdate(ds *appsv1.DaemonSet) error { + nodeToDaemonPods, err := c.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } + + // Calculate maxUnavailable specified by user, default is 1 + maxUnavailable, err := c.maxUnavailableCounts(ds, nodeToDaemonPods) + if err != nil { + return fmt.Errorf("couldn't get maxUnavailable number for daemon set %q: %v", ds.Name, err) + } + + var numUnavailable int + var allowedReplacementPods []string + var candidatePodsToDelete []string + + for nodeName, pods := range nodeToDaemonPods { + // Check if node is ready, ignore not-ready node + // this is a significant difference from the native daemonset controller + ready, err := NodeReadyByName(c.nodeLister, nodeName) + if err != nil { + return fmt.Errorf("couldn't check node %q ready status, %v", nodeName, err) + } + if !ready { + continue + } + + newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods) + if !ok { + // Let the manage loop clean up this node, and treat it as an unavailable node + klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) + numUnavailable++ + continue + } + switch { + case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: + // The manage loop will handle creating or deleting the appropriate pod, consider this unavailable + numUnavailable++ + case newPod != nil: + // This pod is up-to-date, check its availability + if !k8sutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: time.Now()}) { + // An unavailable new pod is counted against maxUnavailable + numUnavailable++ + } + default: + // This pod is old, it is an update candidate + switch { + case !k8sutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: time.Now()}): + // The old pod isn't available, so it needs to be replaced + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // Record the replacement + if allowedReplacementPods == nil { + allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods)) + } + allowedReplacementPods = append(allowedReplacementPods, oldPod.Name) + case numUnavailable >= maxUnavailable: + // No point considering any other candidates + continue + default: + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // Record the candidate + if candidatePodsToDelete == nil { + candidatePodsToDelete = make([]string, 0, maxUnavailable) + } + candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name) + } + } + } + // Use any of the candidates we can, including the allowedReplacemnntPods + klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete)) + remainingUnavailable := maxUnavailable - numUnavailable + if remainingUnavailable < 0 { + remainingUnavailable = 0 + } + if max := len(candidatePodsToDelete); remainingUnavailable > max { + remainingUnavailable = max + } + oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) + + return c.syncPodsOnNodes(ds, oldPodsToDelete) +} + +// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes. +func (c *Controller) getNodesToDaemonPods(ds *appsv1.DaemonSet) (map[string][]*corev1.Pod, error) { + // Ignore adopt/orphan pod, just deal with pods in podLister + pods, err := GetDaemonsetPods(c.podLister, ds) + if err != nil { + return nil, err + } + + // Group Pods by Node name. + nodeToDaemonPods := make(map[string][]*corev1.Pod) + for _, pod := range pods { + nodeName, err := GetTargetNodeName(pod) + if err != nil { + klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v", + pod.Namespace, pod.Name, ds.Namespace, ds.Name) + continue + } + + nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod) + } + + return nodeToDaemonPods, nil +} + +// syncPodsOnNodes deletes pods on the given nodes. +// returns slice with errors if any. +func (c *Controller) syncPodsOnNodes(ds *appsv1.DaemonSet, podsToDelete []string) error { + // We need to set expectations before deleting pods to avoid race conditions. + dsKey, err := cache.MetaNamespaceKeyFunc(ds) + if err != nil { + return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) + } + + deleteDiff := len(podsToDelete) + + if deleteDiff > BurstReplicas { + deleteDiff = BurstReplicas + } + + c.expectations.SetExpectations(dsKey, 0, deleteDiff) + + // Error channel to communicate back failures, make the buffer big enough to avoid any blocking + errCh := make(chan error, deleteDiff) + + // Delete pods process + klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff) + deleteWait := sync.WaitGroup{} + deleteWait.Add(deleteDiff) + for i := 0; i < deleteDiff; i++ { + go func(ix int) { + defer deleteWait.Done() + if err := c.podControl.DeletePod(context.TODO(), ds.Namespace, podsToDelete[ix], ds); err != nil { + c.expectations.DeletionObserved(dsKey) + if !apierrors.IsNotFound(err) { + klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name) + errCh <- err + utilruntime.HandleError(err) + } + } + klog.Infof("Auto update pod %v/%v", ds.Name, podsToDelete[ix]) + }(i) + } + deleteWait.Wait() + + // Collect errors if any for proper reporting/retry logic in the controller + errors := []error{} + close(errCh) + for err := range errCh { + errors = append(errors, err) + } + return utilerrors.NewAggregate(errors) +} + +// maxUnavailableCounts calculates the true number of allowed unavailable +func (c *Controller) maxUnavailableCounts(ds *appsv1.DaemonSet, nodeToDaemonPods map[string][]*corev1.Pod) (int, error) { + // If annotation is not set, use default value one + v, ok := ds.Annotations[MaxUnavailableAnnotation] + if !ok { + return 1, nil + } + + intstrv := intstrutil.Parse(v) + maxUnavailable, err := intstrutil.GetScaledValueFromIntOrPercent(&intstrv, len(nodeToDaemonPods), true) + if err != nil { + return -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) + } + + // If the daemonset returned with an impossible configuration, obey the default of unavailable=1 + if maxUnavailable == 0 { + klog.Warningf("DaemonSet %s/%s is not configured for unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name) + maxUnavailable = 1 + } + klog.V(5).Infof("DaemonSet %s/%s, maxUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable) + return maxUnavailable, nil +} + +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the correct Kind. +func (c *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *appsv1.DaemonSet { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + ds, err := c.daemonsetLister.DaemonSets(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if ds.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return ds +} diff --git a/pkg/controller/daemonpodupdater/daemon_pod_updater_controller_test.go b/pkg/controller/daemonpodupdater/daemon_pod_updater_controller_test.go new file mode 100644 index 00000000000..83441b949a0 --- /dev/null +++ b/pkg/controller/daemonpodupdater/daemon_pod_updater_controller_test.go @@ -0,0 +1,514 @@ +/* +Copyright 2022 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemonpodupdater + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apiserver/pkg/storage/names" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + + k8sutil "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater/kubernetes" +) + +const ( + DefaultMaxUnavailable = "1" + CoupleMaxUnavailable = "2" +) + +var ( + simpleDaemonSetLabel = map[string]string{"foo": "bar"} + alwaysReady = func() bool { return true } +) + +// ---------------------------------------------------------------------------------------------------------------- +// ----------------------------------------------------new Object-------------------------------------------------- +// ---------------------------------------------------------------------------------------------------------------- + +func newDaemonSet(name string, img string) *appsv1.DaemonSet { + two := int32(2) + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: appsv1.DaemonSetSpec{ + RevisionHistoryLimit: &two, + Selector: &metav1.LabelSelector{MatchLabels: simpleDaemonSetLabel}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: simpleDaemonSetLabel, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Image: img}}, + }, + }, + }, + } +} + +func newPod(podName string, nodeName string, label map[string]string, ds *appsv1.DaemonSet) *corev1.Pod { + // Add hash unique label to the pod + newLabels := label + var podSpec corev1.PodSpec + // Copy pod spec from DaemonSet template, or use a default one if DaemonSet is nil + if ds != nil { + hash := k8sutil.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount) + newLabels = CloneAndAddLabel(label, appsv1.DefaultDaemonSetUniqueLabelKey, hash) + podSpec = ds.Spec.Template.Spec + } else { + podSpec = corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "foo/bar", + TerminationMessagePath: corev1.TerminationMessagePathDefault, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + } + } + + // Add node name to the pod + if len(nodeName) > 0 { + podSpec.NodeName = nodeName + } + + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: podName, + Labels: newLabels, + Namespace: metav1.NamespaceDefault, + }, + Spec: podSpec, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + pod.Name = names.SimpleNameGenerator.GenerateName(podName) + if ds != nil { + pod.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(ds, controllerKind)} + } + return pod +} + +func newNode(name string, ready bool) *corev1.Node { + cond := corev1.NodeCondition{ + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + } + if !ready { + cond.Status = corev1.ConditionFalse + } + + return &corev1.Node{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceNone, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + cond, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + } +} + +// ---------------------------------------------------------------------------------------------------------------- +// --------------------------------------------------fakeController------------------------------------------------ +// ---------------------------------------------------------------------------------------------------------------- +type fakeController struct { + *Controller + + dsStore cache.Store + nodeStore cache.Store + podStore cache.Store +} + +// ---------------------------------------------------------------------------------------------------------------- +// --------------------------------------------------fakePodControl------------------------------------------------ +// ---------------------------------------------------------------------------------------------------------------- +type fakePodControl struct { + sync.Mutex + *k8sutil.FakePodControl + podStore cache.Store + podIDMap map[string]*corev1.Pod + expectations k8sutil.ControllerExpectationsInterface +} + +func newFakePodControl() *fakePodControl { + podIDMap := make(map[string]*corev1.Pod) + return &fakePodControl{ + FakePodControl: &k8sutil.FakePodControl{}, + podIDMap: podIDMap, + } +} + +func (f *fakePodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error { + f.Lock() + defer f.Unlock() + if err := f.FakePodControl.DeletePod(ctx, namespace, podID, object); err != nil { + return fmt.Errorf("failed to delete pod %q", podID) + } + pod, ok := f.podIDMap[podID] + if !ok { + return fmt.Errorf("pod %q does not exist", podID) + } + f.podStore.Delete(pod) + delete(f.podIDMap, podID) + + ds := object.(*appsv1.DaemonSet) + dsKey, _ := cache.MetaNamespaceKeyFunc(ds) + f.expectations.DeletionObserved(dsKey) + + return nil +} + +func newTest(initialObjests ...runtime.Object) (*fakeController, *fakePodControl) { + clientset := fake.NewSimpleClientset(initialObjests...) + informerFactory := informers.NewSharedInformerFactory(clientset, 0) + + c := NewController( + clientset, + informerFactory.Apps().V1().DaemonSets(), + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().Pods(), + ) + + c.daemonsetSynced = alwaysReady + c.nodeSynced = alwaysReady + c.podSynced = alwaysReady + + podControl := newFakePodControl() + c.podControl = podControl + podControl.podStore = informerFactory.Core().V1().Pods().Informer().GetStore() + + fakeCtrl := &fakeController{ + c, + informerFactory.Apps().V1().DaemonSets().Informer().GetStore(), + informerFactory.Core().V1().Nodes().Informer().GetStore(), + informerFactory.Core().V1().Pods().Informer().GetStore(), + } + + podControl.expectations = c.expectations + return fakeCtrl, podControl +} + +// ---------------------------------------------------------------------------------------------------------------- +// --------------------------------------------------Expectations-------------------------------------------------- +// ---------------------------------------------------------------------------------------------------------------- + +func expectSyncDaemonSets(t *testing.T, tcase tCase, fakeCtrl *fakeController, ds *appsv1.DaemonSet, + podControl *fakePodControl, expectedDeletes int) { + key, err := cache.MetaNamespaceKeyFunc(ds) + if err != nil { + t.Fatal(err) + } + + intstrv := intstrutil.Parse(tcase.maxUnavailable) + maxUnavailable, err := intstrutil.GetScaledValueFromIntOrPercent(&intstrv, tcase.nodeNum, true) + if err != nil { + t.Fatal(err) + } + // Execute test case + round := expectedDeletes / maxUnavailable + for round >= 0 { + err = fakeCtrl.syncHandler(key) + if err != nil { + t.Fatalf("Test %q does not passed, got syncDaemonsetHandler error %v", tcase.name, err) + } + round-- + } + + // Validate deleted pods number + if !tcase.wantDelete { + return + } + + err = validateSyncDaemonSets(podControl, expectedDeletes) + if err != nil { + t.Fatalf("Test %q does not passed, %v", tcase.name, err) + } +} + +// ---------------------------------------------------------------------------------------------------------------- +// -------------------------------------------------------util----------------------------------------------------- +// ---------------------------------------------------------------------------------------------------------------- + +func setAutoUpdateAnnotation(ds *appsv1.DaemonSet) { + metav1.SetMetaDataAnnotation(&ds.ObjectMeta, UpdateAnnotation, AutoUpdate) +} + +func setOTAUpdateAnnotation(ds *appsv1.DaemonSet) { + metav1.SetMetaDataAnnotation(&ds.ObjectMeta, UpdateAnnotation, OTAUpdate) +} + +func setMaxUnavailableAnnotation(ds *appsv1.DaemonSet, v string) { + metav1.SetMetaDataAnnotation(&ds.ObjectMeta, MaxUnavailableAnnotation, v) +} + +func setOnDelete(ds *appsv1.DaemonSet) { + ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.OnDeleteDaemonSetStrategyType, + } +} + +// validateSyncDaemonSets check whether the number of deleted pod and events meet expectations +func validateSyncDaemonSets(fakePodControl *fakePodControl, expectedDeletes int) error { + if len(fakePodControl.DeletePodName) != expectedDeletes { + return fmt.Errorf("Unexpected number of deletes. Expected %d, got %v\n", expectedDeletes, fakePodControl.DeletePodName) + } + return nil +} + +func addNodesWithPods(fakeCtrl *fakeController, f *fakePodControl, startIndex, numNodes int, ds *appsv1.DaemonSet, ready bool) ([]*corev1.Node, error) { + nodes := make([]*corev1.Node, 0) + + for i := startIndex; i < startIndex+numNodes; i++ { + var nodeName string + switch ready { + case true: + nodeName = fmt.Sprintf("node-ready-%d", i) + case false: + nodeName = fmt.Sprintf("node-not-ready-%d", i) + } + + node := newNode(nodeName, ready) + err := fakeCtrl.nodeStore.Add(node) + if err != nil { + return nil, err + } + nodes = append(nodes, node) + + podPrefix := fmt.Sprintf("pod-%d", i) + pod := newPod(podPrefix, nodeName, simpleDaemonSetLabel, ds) + err = fakeCtrl.podStore.Add(pod) + if err != nil { + return nil, err + } + f.podIDMap[pod.Name] = pod + } + return nodes, nil +} + +// ---------------------------------------------------------------------------------------------------------------- +// ----------------------------------------------------Test Cases-------------------------------------------------- +// ---------------------------------------------------------------------------------------------------------------- + +type tCase struct { + name string + onDelete bool + strategy string + nodeNum int + readyNodeNum int + maxUnavailable string + turnReady bool + wantDelete bool +} + +// DaemonSets should place onto NotReady nodes +func TestDaemonsetPodUpdater(t *testing.T) { + + tcases := []tCase{ + { + name: "failed with not OnDelete strategy", + onDelete: false, + strategy: "auto", + nodeNum: 3, + readyNodeNum: 3, + maxUnavailable: DefaultMaxUnavailable, + turnReady: false, + wantDelete: false, + }, + { + name: "success", + onDelete: true, + strategy: "auto", + nodeNum: 3, + readyNodeNum: 3, + maxUnavailable: DefaultMaxUnavailable, + turnReady: false, + wantDelete: true, + }, + { + name: "success with maxUnavailable is 2", + onDelete: true, + strategy: "auto", + nodeNum: 3, + readyNodeNum: 3, + maxUnavailable: CoupleMaxUnavailable, + turnReady: false, + wantDelete: true, + }, + { + name: "success with maxUnavailable is 50%", + onDelete: true, + strategy: "auto", + nodeNum: 3, + readyNodeNum: 3, + maxUnavailable: "50%", + turnReady: false, + wantDelete: true, + }, + { + name: "success with 1 node not-ready", + onDelete: true, + strategy: "auto", + nodeNum: 3, + readyNodeNum: 2, + maxUnavailable: DefaultMaxUnavailable, + turnReady: false, + wantDelete: true, + }, + { + name: "success with 2 nodes not-ready", + onDelete: true, + strategy: "auto", + nodeNum: 3, + readyNodeNum: 1, + maxUnavailable: DefaultMaxUnavailable, + turnReady: false, + wantDelete: true, + }, + { + name: "success with 2 nodes not-ready, then turn ready", + onDelete: true, + strategy: "auto", + nodeNum: 3, + readyNodeNum: 1, + maxUnavailable: DefaultMaxUnavailable, + turnReady: true, + wantDelete: true, + }, + } + + for _, tcase := range tcases { + t.Logf("Current test case is %q", tcase.name) + ds := newDaemonSet("ds", "foo/bar:v1") + if tcase.onDelete { + setOnDelete(ds) + } + setMaxUnavailableAnnotation(ds, tcase.maxUnavailable) + switch tcase.strategy { + case AutoUpdate: + setAutoUpdateAnnotation(ds) + } + + fakeCtrl, podControl := newTest(ds) + + // add ready nodes and its pods + _, err := addNodesWithPods(fakeCtrl, podControl, 1, tcase.readyNodeNum, ds, true) + if err != nil { + t.Fatal(err) + } + + // add not-ready nodes and its pods + notReadyNodes, err := addNodesWithPods(fakeCtrl, podControl, tcase.readyNodeNum+1, tcase.nodeNum-tcase.readyNodeNum, ds, + false) + if err != nil { + t.Fatal(err) + } + + // Update daemonset specification + ds.Spec.Template.Spec.Containers[0].Image = "foo/bar:v2" + err = fakeCtrl.dsStore.Add(ds) + if err != nil { + t.Fatal(err) + } + + // Check test case + expectSyncDaemonSets(t, tcase, fakeCtrl, ds, podControl, tcase.readyNodeNum) + + if tcase.turnReady { + fakeCtrl.podControl.(*fakePodControl).Clear() + for _, node := range notReadyNodes { + node.Status.Conditions = []corev1.NodeCondition{ + {Type: corev1.NodeReady, Status: corev1.ConditionTrue}, + } + if err := fakeCtrl.nodeStore.Update(node); err != nil { + t.Fatal(err) + } + } + + expectSyncDaemonSets(t, tcase, fakeCtrl, ds, podControl, tcase.nodeNum-tcase.readyNodeNum) + } + } +} + +func TestOTAUpdate(t *testing.T) { + ds := newDaemonSet("ds", "foo/bar:v1") + setOTAUpdateAnnotation(ds) + + node := newNode("node", true) + oldPod := newPod("old-pod", node.Name, simpleDaemonSetLabel, ds) + ds.Spec.Template.Spec.Containers[0].Image = "foo/bar:v2" + newPod := newPod("new-pod", node.Name, simpleDaemonSetLabel, ds) + + fakeCtrl, _ := newTest(ds, oldPod, newPod, node) + + fakeCtrl.podStore.Add(oldPod) + fakeCtrl.podStore.Add(newPod) + fakeCtrl.dsStore.Add(ds) + fakeCtrl.nodeStore.Add(node) + + key, err := cache.MetaNamespaceKeyFunc(ds) + if err != nil { + t.Fatal(err) + } + if err = fakeCtrl.syncHandler(key); err != nil { + t.Fatalf("OTA test does not passed, got syncDaemonsetHandler error %v", err) + } + + // check whether ota PodNeedUpgrade condition set properly + oldPodGot, err := fakeCtrl.kubeclientset.CoreV1().Pods(ds.Namespace).Get(context.TODO(), oldPod.Name, + metav1.GetOptions{}) + if err != nil { + t.Errorf("get oldPod failed, %+v", err) + } + + newPodGot, err := fakeCtrl.kubeclientset.CoreV1().Pods(ds.Namespace).Get(context.TODO(), newPod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("get newPod failed, %+v", err) + } + + assert.Equal(t, true, IsPodUpdatable(oldPodGot)) + assert.Equal(t, false, IsPodUpdatable(newPodGot)) +} diff --git a/pkg/controller/daemonpodupdater/kubernetes/controller_utils.go b/pkg/controller/daemonpodupdater/kubernetes/controller_utils.go new file mode 100644 index 00000000000..f6422fd1b32 --- /dev/null +++ b/pkg/controller/daemonpodupdater/kubernetes/controller_utils.go @@ -0,0 +1,511 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "encoding/binary" + "fmt" + "hash" + "hash/fnv" + "sync" + "sync/atomic" + "time" + + "github.com/davecgh/go-spew/spew" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + // If a watch drops a delete event for a pod, it'll take this long + // before a dormant controller waiting for those packets is woken up anyway. It is + // specifically targeted at the case where some problem prevents an update + // of expectations, without it the controller could stay asleep forever. This should + // be set based on the expected latency of watch events. + // + // Currently a controller can service (create *and* observe the watch events for said + // creation) about 10 pods a second, so it takes about 1 min to service + // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s + // latency/pod at the scale of 3000 pods over 100 nodes. + ExpectationsTimeout = 5 * time.Minute +) + +var ( + KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc +) + +type ResyncPeriodFunc func() time.Duration + +// Expectations are a way for controllers to tell the controller manager what they expect. eg: +// ControllerExpectations: { +// controller1: expects 2 adds in 2 minutes +// controller2: expects 2 dels in 2 minutes +// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met +// } +// +// Implementation: +// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion +// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller +// +// * Once set expectations can only be lowered +// * A controller isn't synced till its expectations are either fulfilled, or expire +// * Controllers that don't set expectations will get woken up for every matching controllee + +// ExpKeyFunc to parse out the key from a ControlleeExpectation +var ExpKeyFunc = func(obj interface{}) (string, error) { + if e, ok := obj.(*ControlleeExpectations); ok { + return e.key, nil + } + return "", fmt.Errorf("could not find key for obj %#v", obj) +} + +// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations. +// Only abstracted out for testing. +// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different +// types of controllers, because the keys might conflict across types. +type ControllerExpectationsInterface interface { + GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) + SatisfiedExpectations(controllerKey string) bool + DeleteExpectations(controllerKey string) + SetExpectations(controllerKey string, add, del int) error + ExpectCreations(controllerKey string, adds int) error + ExpectDeletions(controllerKey string, dels int) error + CreationObserved(controllerKey string) + DeletionObserved(controllerKey string) + RaiseExpectations(controllerKey string, add, del int) + LowerExpectations(controllerKey string, add, del int) +} + +// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync. +type ControllerExpectations struct { + cache.Store +} + +// GetExpectations returns the ControlleeExpectations of the given controller. +func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) { + exp, exists, err := r.GetByKey(controllerKey) + if err == nil && exists { + return exp.(*ControlleeExpectations), true, nil + } + return nil, false, err +} + +// DeleteExpectations deletes the expectations of the given controller from the TTLStore. +func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { + if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { + if err := r.Delete(exp); err != nil { + klog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err) + } + } +} + +// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. +// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller +// manager. +func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { + if exp, exists, err := r.GetExpectations(controllerKey); exists { + if exp.Fulfilled() { + klog.V(4).Infof("Controller expectations fulfilled %#v", exp) + return true + } else if exp.isExpired() { + klog.V(4).Infof("Controller expectations expired %#v", exp) + return true + } else { + klog.V(4).Infof("Controller still waiting on expectations %#v", exp) + return false + } + } else if err != nil { + klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err) + } else { + // When a new controller is created, it doesn't have expectations. + // When it doesn't see expected watch events for > TTL, the expectations expire. + // - In this case it wakes up, creates/deletes controllees, and sets expectations again. + // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire. + // - In this case it continues without setting expectations till it needs to create/delete controllees. + klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey) + } + // Trigger a sync if we either encountered and error (which shouldn't happen since we're + // getting from local store) or this controller hasn't established expectations. + return true +} + +// TODO: Extend ExpirationCache to support explicit expiration. +// TODO: Make this possible to disable in tests. +// TODO: Support injection of clock. +func (exp *ControlleeExpectations) isExpired() bool { + return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout +} + +// SetExpectations registers new expectations for the given controller. Forgets existing expectations. +func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { + exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()} + klog.V(4).Infof("Setting expectations %#v", exp) + return r.Add(exp) +} + +func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error { + return r.SetExpectations(controllerKey, adds, 0) +} + +func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error { + return r.SetExpectations(controllerKey, 0, dels) +} + +// Decrements the expectation counts of the given controller. +func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(-add), int64(-del)) + // The expectations might've been modified since the update on the previous line. + klog.V(4).Infof("Lowered expectations %#v", exp) + } +} + +// Increments the expectation counts of the given controller. +func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(add), int64(del)) + // The expectations might've been modified since the update on the previous line. + klog.V(4).Infof("Raised expectations %#v", exp) + } +} + +// CreationObserved atomically decrements the `add` expectation count of the given controller. +func (r *ControllerExpectations) CreationObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 1, 0) +} + +// DeletionObserved atomically decrements the `del` expectation count of the given controller. +func (r *ControllerExpectations) DeletionObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 0, 1) +} + +// ControlleeExpectations track controllee creates/deletes. +type ControlleeExpectations struct { + // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms + // See: https://golang.org/pkg/sync/atomic/ for more information + add int64 + del int64 + key string + timestamp time.Time +} + +// Add increments the add and del counters. +func (e *ControlleeExpectations) Add(add, del int64) { + atomic.AddInt64(&e.add, add) + atomic.AddInt64(&e.del, del) +} + +// Fulfilled returns true if this expectation has been fulfilled. +func (e *ControlleeExpectations) Fulfilled() bool { + // TODO: think about why this line being atomic doesn't matter + return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0 +} + +// GetExpectations returns the add and del expectations of the controllee. +func (e *ControlleeExpectations) GetExpectations() (int64, int64) { + return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del) +} + +// NewControllerExpectations returns a store for ControllerExpectations. +func NewControllerExpectations() *ControllerExpectations { + return &ControllerExpectations{cache.NewStore(ExpKeyFunc)} +} + +// Reasons for pod events +const ( + // FailedCreatePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be created. + FailedCreatePodReason = "FailedCreate" + // SuccessfulCreatePodReason is added in an event when a pod for a replica set + // is successfully created. + SuccessfulCreatePodReason = "SuccessfulCreate" + // FailedDeletePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be deleted. + FailedDeletePodReason = "FailedDelete" + // SuccessfulDeletePodReason is added in an event when a pod for a replica set + // is successfully deleted. + SuccessfulDeletePodReason = "SuccessfulDelete" +) + +// ComputeHash returns a hash value calculated from pod template and +// a collisionCount to avoid hash collision. The hash will be safe encoded to +// avoid bad words. +func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int32) string { + podTemplateSpecHasher := fnv.New32a() + DeepHashObject(podTemplateSpecHasher, *template) + + // Add collisionCount in the hash if it exists. + if collisionCount != nil { + collisionCountBytes := make([]byte, 8) + binary.LittleEndian.PutUint32(collisionCountBytes, uint32(*collisionCount)) + podTemplateSpecHasher.Write(collisionCountBytes) + } + + return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32())) +} + +// DeepHashObject writes specified object to hash using the spew library +// which follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hasher, "%#v", objectToWrite) +} + +// PodControlInterface is an interface that knows how to add or delete pods +// created as an interface to allow testing. +type PodControlInterface interface { + // CreatePods creates new pods according to the spec, and sets object as the pod's controller. + CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // CreatePodsWithGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName. + CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error + // DeletePod deletes the pod identified by podID. + DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error + // PatchPod patches the pod. + PatchPod(ctx context.Context, namespace, name string, data []byte) error +} + +// RealPodControl is the default implementation of PodControlInterface. +type RealPodControl struct { + KubeClient clientset.Interface + Recorder record.EventRecorder +} + +var _ PodControlInterface = &RealPodControl{} + +func getPodsLabelSet(template *v1.PodTemplateSpec) labels.Set { + desiredLabels := make(labels.Set) + for k, v := range template.Labels { + desiredLabels[k] = v + } + return desiredLabels +} + +func getPodsFinalizers(template *v1.PodTemplateSpec) []string { + desiredFinalizers := make([]string, len(template.Finalizers)) + copy(desiredFinalizers, template.Finalizers) + return desiredFinalizers +} + +func getPodsAnnotationSet(template *v1.PodTemplateSpec) labels.Set { + desiredAnnotations := make(labels.Set) + for k, v := range template.Annotations { + desiredAnnotations[k] = v + } + return desiredAnnotations +} + +func getPodsPrefix(controllerName string) string { + // use the dash (if the name isn't too long) to make the pod name a bit prettier + prefix := fmt.Sprintf("%s-", controllerName) + if len(apimachineryvalidation.NameIsDNSSubdomain(prefix, true)) != 0 { + prefix = controllerName + } + return prefix +} + +func validateControllerRef(controllerRef *metav1.OwnerReference) error { + if controllerRef == nil { + return fmt.Errorf("controllerRef is nil") + } + if len(controllerRef.APIVersion) == 0 { + return fmt.Errorf("controllerRef has empty APIVersion") + } + if len(controllerRef.Kind) == 0 { + return fmt.Errorf("controllerRef has empty Kind") + } + if controllerRef.Controller == nil || !*controllerRef.Controller { + return fmt.Errorf("controllerRef.Controller is not set to true") + } + if controllerRef.BlockOwnerDeletion == nil || !*controllerRef.BlockOwnerDeletion { + return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set") + } + return nil +} + +func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { + return r.CreatePodsWithGenerateName(ctx, namespace, template, controllerObject, controllerRef, "") +} + +func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error { + if err := validateControllerRef(controllerRef); err != nil { + return err + } + pod, err := GetPodFromTemplate(template, controllerObject, controllerRef) + if err != nil { + return err + } + if len(generateName) > 0 { + pod.ObjectMeta.GenerateName = generateName + } + return r.createPods(ctx, namespace, pod, controllerObject) +} + +func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error { + _, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) + return err +} + +func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Object, controllerRef *metav1.OwnerReference) (*v1.Pod, error) { + desiredLabels := getPodsLabelSet(template) + desiredFinalizers := getPodsFinalizers(template) + desiredAnnotations := getPodsAnnotationSet(template) + accessor, err := meta.Accessor(parentObject) + if err != nil { + return nil, fmt.Errorf("parentObject does not have ObjectMeta, %v", err) + } + prefix := getPodsPrefix(accessor.GetName()) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: desiredLabels, + Annotations: desiredAnnotations, + GenerateName: prefix, + Finalizers: desiredFinalizers, + }, + } + if controllerRef != nil { + pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef) + } + pod.Spec = *template.Spec.DeepCopy() + return pod, nil +} + +func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error { + if len(labels.Set(pod.Labels)) == 0 { + return fmt.Errorf("unable to create pods, no labels") + } + newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + // only send an event if the namespace isn't terminating + if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { + r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err) + } + return err + } + accessor, err := meta.Accessor(object) + if err != nil { + klog.Errorf("parentObject does not have ObjectMeta, %v", err) + return nil + } + klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) + r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name) + + return nil +} + +func (r RealPodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error { + accessor, err := meta.Accessor(object) + if err != nil { + return fmt.Errorf("object does not have ObjectMeta, %v", err) + } + klog.V(2).InfoS("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID)) + if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("pod %v/%v has already been deleted.", namespace, podID) + return err + } + r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) + return fmt.Errorf("unable to delete pods: %v", err) + } + r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID) + + return nil +} + +type FakePodControl struct { + sync.Mutex + Templates []v1.PodTemplateSpec + ControllerRefs []metav1.OwnerReference + DeletePodName []string + Patches [][]byte + Err error + CreateLimit int + CreateCallCount int +} + +var _ PodControlInterface = &FakePodControl{} + +func (f *FakePodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error { + f.Lock() + defer f.Unlock() + f.Patches = append(f.Patches, data) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePods(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + return f.CreatePodsWithGenerateName(ctx, namespace, spec, object, controllerRef, "") +} + +func (f *FakePodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + spec.GenerateName = generateNamePrefix + f.Templates = append(f.Templates, *spec) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error { + f.Lock() + defer f.Unlock() + f.DeletePodName = append(f.DeletePodName, podID) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) Clear() { + f.Lock() + defer f.Unlock() + f.DeletePodName = []string{} + f.Templates = []v1.PodTemplateSpec{} + f.ControllerRefs = []metav1.OwnerReference{} + f.Patches = [][]byte{} + f.CreateLimit = 0 + f.CreateCallCount = 0 +} diff --git a/pkg/controller/daemonpodupdater/kubernetes/controller_utils_test.go b/pkg/controller/daemonpodupdater/kubernetes/controller_utils_test.go new file mode 100644 index 00000000000..406141f8daf --- /dev/null +++ b/pkg/controller/daemonpodupdater/kubernetes/controller_utils_test.go @@ -0,0 +1,282 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "encoding/json" + "fmt" + "math" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/uuid" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + clientscheme "k8s.io/client-go/kubernetes/scheme" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + utiltesting "k8s.io/client-go/util/testing" +) + +// NewFakeControllerExpectationsLookup creates a fake store for PodExpectations. +func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *clock.FakeClock) { + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + fakeClock := clock.NewFakeClock(fakeTime) + ttlPolicy := &cache.TTLPolicy{TTL: ttl, Clock: fakeClock} + ttlStore := cache.NewFakeExpirationStore( + ExpKeyFunc, nil, ttlPolicy, fakeClock) + return &ControllerExpectations{ttlStore}, fakeClock +} + +func newReplicationController(replicas int) *v1.ReplicationController { + rc := &v1.ReplicationController{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "18", + }, + Spec: v1.ReplicationControllerSpec{ + Replicas: func() *int32 { i := int32(replicas); return &i }(), + Selector: map[string]string{"foo": "bar"}, + Template: &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "name": "foo", + "type": "production", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: "foo/bar", + TerminationMessagePath: v1.TerminationMessagePathDefault, + ImagePullPolicy: v1.PullIfNotPresent, + }, + }, + RestartPolicy: v1.RestartPolicyAlways, + DNSPolicy: v1.DNSDefault, + NodeSelector: map[string]string{ + "baz": "blah", + }, + }, + }, + }, + } + return rc +} + +func TestControllerExpectations(t *testing.T) { + ttl := 30 * time.Second + e, fakeClock := NewFakeControllerExpectationsLookup(ttl) + // In practice we can't really have add and delete expectations since we only either create or + // delete replicas in one rc pass, and the rc goes to sleep soon after until the expectations are + // either fulfilled or timeout. + adds, dels := 10, 30 + rc := newReplicationController(1) + + // RC fires off adds and deletes at apiserver, then sets expectations + rcKey, err := KeyFunc(rc) + assert.NoError(t, err, "Couldn't get key for object %#v: %v", rc, err) + + e.SetExpectations(rcKey, adds, dels) + var wg sync.WaitGroup + for i := 0; i < adds+1; i++ { + wg.Add(1) + go func() { + // In prod this can happen either because of a failed create by the rc + // or after having observed a create via informer + e.CreationObserved(rcKey) + wg.Done() + }() + } + wg.Wait() + + // There are still delete expectations + assert.False(t, e.SatisfiedExpectations(rcKey), "Rc will sync before expectations are met") + + for i := 0; i < dels+1; i++ { + wg.Add(1) + go func() { + e.DeletionObserved(rcKey) + wg.Done() + }() + } + wg.Wait() + + // Expectations have been surpassed + podExp, exists, err := e.GetExpectations(rcKey) + assert.NoError(t, err, "Could not get expectations for rc, exists %v and err %v", exists, err) + assert.True(t, exists, "Could not get expectations for rc, exists %v and err %v", exists, err) + + add, del := podExp.GetExpectations() + assert.Equal(t, int64(-1), add, "Unexpected pod expectations %#v", podExp) + assert.Equal(t, int64(-1), del, "Unexpected pod expectations %#v", podExp) + assert.True(t, e.SatisfiedExpectations(rcKey), "Expectations are met but the rc will not sync") + + // Next round of rc sync, old expectations are cleared + e.SetExpectations(rcKey, 1, 2) + podExp, exists, err = e.GetExpectations(rcKey) + assert.NoError(t, err, "Could not get expectations for rc, exists %v and err %v", exists, err) + assert.True(t, exists, "Could not get expectations for rc, exists %v and err %v", exists, err) + add, del = podExp.GetExpectations() + + assert.Equal(t, int64(1), add, "Unexpected pod expectations %#v", podExp) + assert.Equal(t, int64(2), del, "Unexpected pod expectations %#v", podExp) + + // Expectations have expired because of ttl + fakeClock.Step(ttl + 1) + assert.True(t, e.SatisfiedExpectations(rcKey), + "Expectations should have expired but didn't") +} + +func TestCreatePods(t *testing.T) { + ns := metav1.NamespaceDefault + body := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "empty_pod"}}) + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + + podControl := RealPodControl{ + KubeClient: clientset, + Recorder: &record.FakeRecorder{}, + } + + controllerSpec := newReplicationController(1) + controllerRef := metav1.NewControllerRef(controllerSpec, v1.SchemeGroupVersion.WithKind("ReplicationController")) + + // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template + err := podControl.CreatePods(context.TODO(), ns, controllerSpec.Spec.Template, controllerSpec, controllerRef) + assert.NoError(t, err, "unexpected error: %v", err) + + expectedPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: controllerSpec.Spec.Template.Labels, + GenerateName: fmt.Sprintf("%s-", controllerSpec.Name), + }, + Spec: controllerSpec.Spec.Template.Spec, + } + fakeHandler.ValidateRequest(t, "/api/v1/namespaces/default/pods", "POST", nil) + var actualPod = &v1.Pod{} + err = json.Unmarshal([]byte(fakeHandler.RequestBody), actualPod) + assert.NoError(t, err, "unexpected error: %v", err) + assert.True(t, apiequality.Semantic.DeepDerivative(&expectedPod, actualPod), + "Body: %s", fakeHandler.RequestBody) +} + +func TestCreatePodsWithGenerateName(t *testing.T) { + ns := metav1.NamespaceDefault + body := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "empty_pod"}}) + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + + podControl := RealPodControl{ + KubeClient: clientset, + Recorder: &record.FakeRecorder{}, + } + + controllerSpec := newReplicationController(1) + controllerRef := metav1.NewControllerRef(controllerSpec, v1.SchemeGroupVersion.WithKind("ReplicationController")) + + // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template + generateName := "hello-" + err := podControl.CreatePodsWithGenerateName(context.TODO(), ns, controllerSpec.Spec.Template, controllerSpec, controllerRef, generateName) + assert.NoError(t, err, "unexpected error: %v", err) + + expectedPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: controllerSpec.Spec.Template.Labels, + GenerateName: generateName, + OwnerReferences: []metav1.OwnerReference{*controllerRef}, + }, + Spec: controllerSpec.Spec.Template.Spec, + } + + fakeHandler.ValidateRequest(t, "/api/v1/namespaces/default/pods", "POST", nil) + var actualPod = &v1.Pod{} + err = json.Unmarshal([]byte(fakeHandler.RequestBody), actualPod) + assert.NoError(t, err, "unexpected error: %v", err) + assert.True(t, apiequality.Semantic.DeepDerivative(&expectedPod, actualPod), + "Body: %s", fakeHandler.RequestBody) +} + +func TestDeletePodsAllowsMissing(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + podControl := RealPodControl{ + KubeClient: fakeClient, + Recorder: &record.FakeRecorder{}, + } + + controllerSpec := newReplicationController(1) + + err := podControl.DeletePod(context.TODO(), "namespace-name", "podName", controllerSpec) + assert.True(t, apierrors.IsNotFound(err)) +} + +func TestComputeHash(t *testing.T) { + collisionCount := int32(1) + otherCollisionCount := int32(2) + maxCollisionCount := int32(math.MaxInt32) + tests := []struct { + name string + template *v1.PodTemplateSpec + collisionCount *int32 + otherCollisionCount *int32 + }{ + { + name: "simple", + template: &v1.PodTemplateSpec{}, + collisionCount: &collisionCount, + otherCollisionCount: &otherCollisionCount, + }, + { + name: "using math.MaxInt64", + template: &v1.PodTemplateSpec{}, + collisionCount: nil, + otherCollisionCount: &maxCollisionCount, + }, + } + + for _, test := range tests { + hash := ComputeHash(test.template, test.collisionCount) + otherHash := ComputeHash(test.template, test.otherCollisionCount) + + assert.NotEqual(t, hash, otherHash, "expected different hashes but got the same: %d", hash) + } +} diff --git a/pkg/controller/daemonpodupdater/kubernetes/pod_util.go b/pkg/controller/daemonpodupdater/kubernetes/pod_util.go new file mode 100644 index 00000000000..dd50ab3bfaa --- /dev/null +++ b/pkg/controller/daemonpodupdater/kubernetes/pod_util.go @@ -0,0 +1,83 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// IsPodAvailable returns true if a pod is available; false otherwise. +// Precondition for an available pod is that it must be ready. On top +// of that, there are two cases when a pod can be considered available: +// 1. minReadySeconds == 0, or +// 2. LastTransitionTime (is set) + minReadySeconds < current time +func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool { + if !IsPodReady(pod) { + return false + } + + c := GetPodReadyCondition(pod.Status) + minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second + if minReadySeconds == 0 || (!c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time)) { + return true + } + return false +} + +// IsPodReady returns true if a pod is ready; false otherwise. +func IsPodReady(pod *v1.Pod) bool { + return IsPodReadyConditionTrue(pod.Status) +} + +// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise. +func IsPodReadyConditionTrue(status v1.PodStatus) bool { + condition := GetPodReadyCondition(status) + return condition != nil && condition.Status == v1.ConditionTrue +} + +// GetPodReadyCondition extracts the pod ready condition from the given status and returns that. +// Returns nil if the condition is not present. +func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition { + _, condition := GetPodCondition(&status, v1.PodReady) + return condition +} + +// GetPodCondition extracts the provided condition from the given status and returns that. +// Returns nil and -1 if the condition is not present, and the index of the located condition. +func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if status == nil { + return -1, nil + } + return GetPodConditionFromList(status.Conditions, conditionType) +} + +// GetPodConditionFromList extracts the provided condition from the given list of condition and +// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present. +func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if conditions == nil { + return -1, nil + } + for i := range conditions { + if conditions[i].Type == conditionType { + return i, &conditions[i] + } + } + return -1, nil +} diff --git a/pkg/controller/daemonpodupdater/kubernetes/pod_util_test.go b/pkg/controller/daemonpodupdater/kubernetes/pod_util_test.go new file mode 100644 index 00000000000..e9d55223cb2 --- /dev/null +++ b/pkg/controller/daemonpodupdater/kubernetes/pod_util_test.go @@ -0,0 +1,80 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func newPod(now metav1.Time, ready bool, beforeSec int) *v1.Pod { + conditionStatus := v1.ConditionFalse + if ready { + conditionStatus = v1.ConditionTrue + } + return &v1.Pod{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + LastTransitionTime: metav1.NewTime(now.Time.Add(-1 * time.Duration(beforeSec) * time.Second)), + Status: conditionStatus, + }, + }, + }, + } +} + +func TestIsPodAvailable(t *testing.T) { + now := metav1.Now() + tests := []struct { + pod *v1.Pod + minReadySeconds int32 + expected bool + }{ + { + pod: newPod(now, false, 0), + minReadySeconds: 0, + expected: false, + }, + { + pod: newPod(now, true, 0), + minReadySeconds: 1, + expected: false, + }, + { + pod: newPod(now, true, 0), + minReadySeconds: 0, + expected: true, + }, + { + pod: newPod(now, true, 51), + minReadySeconds: 50, + expected: true, + }, + } + + for i, test := range tests { + isAvailable := IsPodAvailable(test.pod, test.minReadySeconds, now) + if isAvailable != test.expected { + t.Errorf("[tc #%d] expected available pod: %t, got: %t", i, test.expected, isAvailable) + } + } +} diff --git a/pkg/controller/daemonpodupdater/util.go b/pkg/controller/daemonpodupdater/util.go new file mode 100644 index 00000000000..75b08db65a4 --- /dev/null +++ b/pkg/controller/daemonpodupdater/util.go @@ -0,0 +1,248 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemonpodupdater + +import ( + "context" + "fmt" + "strconv" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + client "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" + + k8sutil "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater/kubernetes" + util "github.com/openyurtio/openyurt/pkg/controller/util/node" +) + +// GetDaemonsetPods get all pods belong to the given daemonset +func GetDaemonsetPods(podLister corelisters.PodLister, ds *appsv1.DaemonSet) ([]*corev1.Pod, error) { + dsPods := make([]*corev1.Pod, 0) + dsPodsNames := make([]string, 0) + pods, err := podLister.Pods(ds.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + for i, pod := range pods { + owner := metav1.GetControllerOf(pod) + if owner == nil { + continue + } + if owner.UID == ds.UID { + dsPods = append(dsPods, pods[i]) + dsPodsNames = append(dsPodsNames, pod.Name) + } + } + + if len(dsPods) > 0 { + klog.V(4).Infof("Daemonset %v has pods %v", ds.Name, dsPodsNames) + } + return dsPods, nil +} + +// IsDaemonsetPodLatest check whether pod is the latest by comparing its Spec with daemonset's +// If pod is latest, return true, otherwise return false +func IsDaemonsetPodLatest(ds *appsv1.DaemonSet, pod *corev1.Pod) bool { + hash := k8sutil.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount) + klog.V(4).Infof("compute hash: %v", hash) + generation, err := GetTemplateGeneration(ds) + if err != nil { + generation = nil + } + + klog.V(5).Infof("daemonset %v revision hash is %v", ds.Name, hash) + klog.V(5).Infof("daemonset %v generation is %v", ds.Name, generation) + + templateMatches := generation != nil && pod.Labels[extensions.DaemonSetTemplateGenerationKey] == fmt.Sprint(generation) + hashMatches := len(hash) > 0 && pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey] == hash + return hashMatches || templateMatches +} + +// GetTemplateGeneration get annotation "deprecated.daemonset.template.generation" of the given daemonset +func GetTemplateGeneration(ds *appsv1.DaemonSet) (*int64, error) { + annotation, found := ds.Annotations[appsv1.DeprecatedTemplateGeneration] + if !found { + return nil, nil + } + generation, err := strconv.ParseInt(annotation, 10, 64) + if err != nil { + return nil, err + } + return &generation, nil +} + +// NodeReadyByName check if the given node is ready +func NodeReadyByName(nodeList corelisters.NodeLister, nodeName string) (bool, error) { + node, err := nodeList.Get(nodeName) + if err != nil { + return false, err + } + + return NodeReady(&node.Status), nil +} + +// NodeReady check if the given node status is ready +func NodeReady(nodeStatus *corev1.NodeStatus) bool { + for _, cond := range nodeStatus.Conditions { + if cond.Type == corev1.NodeReady { + return cond.Status == corev1.ConditionTrue + } + } + return false +} + +// SetPodUpgradeCondition calculate and set pod condition "PodNeedUpgrade" +func SetPodUpgradeCondition(clientset client.Interface, ds *appsv1.DaemonSet, pod *corev1.Pod) error { + isUpdatable := IsDaemonsetPodLatest(ds, pod) + + // Comply with K8s, use constant ConditionTrue and ConditionFalse + var status corev1.ConditionStatus + switch isUpdatable { + case true: + status = corev1.ConditionFalse + case false: + status = corev1.ConditionTrue + } + + cond := &corev1.PodCondition{ + Type: PodNeedUpgrade, + Status: status, + } + if change := util.UpdatePodCondition(&pod.Status, cond); change { + if _, err := clientset.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil { + return err + } + klog.Infof("set pod %q condition PodNeedUpgrade to %v", pod.Name, !isUpdatable) + } + + return nil +} + +// checkPrerequisites checks that daemonset meets two conditions +// 1. annotation "apps.openyurt.io/update-strategy"="auto" or "ota" +// 2. update strategy is "OnDelete" +func checkPrerequisites(ds *appsv1.DaemonSet) bool { + v, ok := ds.Annotations[UpdateAnnotation] + if !ok || (v != AutoUpdate && v != OTAUpdate) { + return false + } + return ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType +} + +// CloneAndAddLabel clones the given map and returns a new map with the given key and value added. +// Returns the given map, if labelKey is empty. +func CloneAndAddLabel(labels map[string]string, labelKey, labelValue string) map[string]string { + if labelKey == "" { + // Don't need to add a label. + return labels + } + // Clone. + newLabels := map[string]string{} + for key, value := range labels { + newLabels[key] = value + } + newLabels[labelKey] = labelValue + return newLabels +} + +// findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there +// is at most one of each old and new pods, or false if there are multiples. We can skip +// processing the particular node in those scenarios and let the manage loop prune the +// excess pods for our next time around. +func findUpdatedPodsOnNode(ds *appsv1.DaemonSet, podsOnNode []*corev1.Pod) (newPod, oldPod *corev1.Pod, ok bool) { + for _, pod := range podsOnNode { + if pod.DeletionTimestamp != nil { + continue + } + + if IsDaemonsetPodLatest(ds, pod) { + if newPod != nil { + return nil, nil, false + } + newPod = pod + } else { + if oldPod != nil { + return nil, nil, false + } + oldPod = pod + } + } + return newPod, oldPod, true +} + +// GetTargetNodeName get the target node name of DaemonSet pods. If `.spec.NodeName` is not empty (nil), +// return `.spec.NodeName`; otherwise, retrieve node name of pending pods from NodeAffinity. Return error +// if failed to retrieve node name from `.spec.NodeName` and NodeAffinity. +func GetTargetNodeName(pod *corev1.Pod) (string, error) { + if len(pod.Spec.NodeName) != 0 { + return pod.Spec.NodeName, nil + } + + // Retrieve node name of unscheduled pods from NodeAffinity + if pod.Spec.Affinity == nil || + pod.Spec.Affinity.NodeAffinity == nil || + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + return "", fmt.Errorf("no spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution for pod %s/%s", + pod.Namespace, pod.Name) + } + + terms := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + if len(terms) < 1 { + return "", fmt.Errorf("no nodeSelectorTerms in requiredDuringSchedulingIgnoredDuringExecution of pod %s/%s", + pod.Namespace, pod.Name) + } + + for _, term := range terms { + for _, exp := range term.MatchFields { + if exp.Key == metav1.ObjectNameField && + exp.Operator == corev1.NodeSelectorOpIn { + if len(exp.Values) != 1 { + return "", fmt.Errorf("the matchFields value of '%s' is not unique for pod %s/%s", + metav1.ObjectNameField, pod.Namespace, pod.Name) + } + + return exp.Values[0], nil + } + } + } + + return "", fmt.Errorf("no node name found for pod %s/%s", pod.Namespace, pod.Name) +} + +// IsPodUpdatable returns true if a pod is updatable; false otherwise. +func IsPodUpdatable(pod *corev1.Pod) bool { + return IsPodUpgradeConditionTrue(pod.Status) +} + +// IsPodUpgradeConditionTrue returns true if a pod is updatable; false otherwise. +func IsPodUpgradeConditionTrue(status corev1.PodStatus) bool { + condition := GetPodUpgradeCondition(status) + return condition != nil && condition.Status == corev1.ConditionTrue +} + +// GetPodUpgradeCondition extracts the pod upgrade condition from the given status and returns that. +// Returns nil if the condition is not present. +func GetPodUpgradeCondition(status corev1.PodStatus) *corev1.PodCondition { + _, condition := k8sutil.GetPodCondition(&status, PodNeedUpgrade) + return condition +} diff --git a/pkg/controller/daemonpodupdater/util_test.go b/pkg/controller/daemonpodupdater/util_test.go new file mode 100644 index 00000000000..524caa90378 --- /dev/null +++ b/pkg/controller/daemonpodupdater/util_test.go @@ -0,0 +1,230 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemonpodupdater + +import ( + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +func TestGetDaemonsetPods(t *testing.T) { + ds1 := newDaemonSet("daemosnet1", "foo/bar:v1") + + pod1 := newPod("pod1", "", simpleDaemonSetLabel, ds1) + pod2 := newPod("pod2", "", simpleDaemonSetLabel, nil) + + expectPods := []*corev1.Pod{pod1} + clientset := fake.NewSimpleClientset(ds1, pod1, pod2) + podInformer := informers.NewSharedInformerFactory(clientset, 0) + + podInformer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + podInformer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + gotPods, err := GetDaemonsetPods(podInformer.Core().V1().Pods().Lister(), ds1) + + assert.Equal(t, nil, err) + assert.Equal(t, expectPods, gotPods) +} + +func TestIsDaemonsetPodLatest(t *testing.T) { + daemosnetV1 := newDaemonSet("daemonset", "foo/bar:v1") + daemosnetV2 := daemosnetV1.DeepCopy() + daemosnetV2.Spec.Template.Spec.Containers[0].Image = "foo/bar:v2" + + tests := []struct { + name string + ds *appsv1.DaemonSet + pod *corev1.Pod + wantLatest bool + }{ + { + name: "latest", + ds: daemosnetV1, + pod: newPod("pod", "", simpleDaemonSetLabel, daemosnetV1), + wantLatest: true, + }, + { + name: "not latest", + ds: daemosnetV2, + pod: newPod("pod", "", simpleDaemonSetLabel, daemosnetV1), + wantLatest: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotLatest := IsDaemonsetPodLatest(tt.ds, tt.pod) + assert.Equal(t, tt.wantLatest, gotLatest) + }) + } +} + +func Test_checkPrerequisites(t *testing.T) { + tests := []struct { + name string + ds *appsv1.DaemonSet + want bool + }{ + { + name: "satisfied-ota", + ds: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "apps.openyurt.io/update-strategy": "ota", + }, + }, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.OnDeleteDaemonSetStrategyType, + }, + }, + }, + want: true, + }, + { + name: "satisfied-auto", + ds: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "apps.openyurt.io/update-strategy": "auto", + }, + }, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.OnDeleteDaemonSetStrategyType, + }, + }, + }, + want: true, + }, + { + name: "unsatisfied-other", + ds: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "apps.openyurt.io/update-strategy": "other", + }, + }, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.OnDeleteDaemonSetStrategyType, + }, + }, + }, + want: false, + }, + { + name: "unsatisfied-without-ann", + ds: &appsv1.DaemonSet{ + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.OnDeleteDaemonSetStrategyType, + }, + }, + }, + want: false, + }, + { + name: "unsatisfied-without-updateStrategy", + ds: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "apps.openyurt.io/update-strategy": "other", + }, + }, + }, + want: false, + }, + { + name: "unsatisfied-without-both", + ds: &appsv1.DaemonSet{}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := checkPrerequisites(tt.ds); got != tt.want { + t.Errorf("checkPrerequisites() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetTargetNodeName(t *testing.T) { + pod := newPod("pod", "", nil, nil) + + podWithName := pod.DeepCopy() + podWithName.Spec.NodeName = "node" + + podwithAffinity := pod.DeepCopy() + podwithAffinity.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchFields: []corev1.NodeSelectorRequirement{ + { + Key: metav1.ObjectNameField, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"affinity"}, + }, + }, + }, + }, + }, + }, + } + + tests := []struct { + pod *corev1.Pod + want string + wantErr bool + }{ + { + pod: pod, + want: "", + wantErr: true, + }, + { + pod: podWithName, + want: "node", + wantErr: false, + }, + { + pod: podwithAffinity, + want: "affinity", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run("", func(t *testing.T) { + got, err := GetTargetNodeName(tt.pod) + if (err != nil) != tt.wantErr { + t.Errorf("GetTargetNodeName() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetTargetNodeName() got = %v, want %v", got, tt.want) + } + }) + } +}