From dab05539a4ad251ffa879bad9a99f84c51588a87 Mon Sep 17 00:00:00 2001 From: hxcGit Date: Fri, 26 Aug 2022 09:42:09 +0800 Subject: [PATCH] add auto pod upgrade controller for daemoset Signed-off-by: hxcGit --- .../app/controllermanager.go | 2 +- cmd/yurt-controller-manager/app/core.go | 15 + go.mod | 1 + .../podupgrade/pod_upgrade_controller.go | 344 ++++++++++++++ .../podupgrade/pod_upgrade_controller_test.go | 432 ++++++++++++++++++ pkg/controller/podupgrade/util.go | 222 +++++++++ pkg/controller/podupgrade/util_test.go | 45 ++ 7 files changed, 1060 insertions(+), 1 deletion(-) create mode 100644 pkg/controller/podupgrade/pod_upgrade_controller.go create mode 100644 pkg/controller/podupgrade/pod_upgrade_controller_test.go create mode 100644 pkg/controller/podupgrade/util.go create mode 100644 pkg/controller/podupgrade/util_test.go diff --git a/cmd/yurt-controller-manager/app/controllermanager.go b/cmd/yurt-controller-manager/app/controllermanager.go index 0354dc0bf8c..b59fee63747 100644 --- a/cmd/yurt-controller-manager/app/controllermanager.go +++ b/cmd/yurt-controller-manager/app/controllermanager.go @@ -305,7 +305,7 @@ func NewControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["nodelifecycle"] = startNodeLifecycleController controllers["yurtcsrapprover"] = startYurtCSRApproverController - + controllers["podupgrade"] = startPodUpgradeController return controllers } diff --git a/cmd/yurt-controller-manager/app/core.go b/cmd/yurt-controller-manager/app/core.go index f84777b4d3c..7fd0c150df8 100644 --- a/cmd/yurt-controller-manager/app/core.go +++ b/cmd/yurt-controller-manager/app/core.go @@ -27,6 +27,7 @@ import ( "github.com/openyurtio/openyurt/pkg/controller/certificates" lifecyclecontroller "github.com/openyurtio/openyurt/pkg/controller/nodelifecycle" + podupgradecontroller "github.com/openyurtio/openyurt/pkg/controller/podupgrade" ) func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) { @@ -65,3 +66,17 @@ func startYurtCSRApproverController(ctx ControllerContext) (http.Handler, bool, return nil, true, nil } + +func startPodUpgradeController(ctx ControllerContext) (http.Handler, bool, error) { + podUpgradeCtrl := podupgradecontroller.NewController( + ctx.ClientBuilder.ClientOrDie("podUpgrade-controller"), + ctx.InformerFactory.Apps().V1().DaemonSets(), + ctx.InformerFactory.Core().V1().Nodes(), + ctx.InformerFactory.Core().V1().Pods(), + ctx.InformerFactory.Apps().V1().ControllerRevisions(), + ) + + go podUpgradeCtrl.Run(2, ctx.Stop) + + return nil, true, nil +} diff --git a/go.mod b/go.mod index b7bd9e50103..4cc30637a49 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Masterminds/semver/v3 v3.1.1 github.com/Microsoft/go-winio v0.4.15 github.com/aliyun/alibaba-cloud-sdk-go v1.61.579 + github.com/davecgh/go-spew v1.1.1 github.com/daviddengcn/go-colortext v1.0.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/emicklei/go-restful v2.12.0+incompatible // indirect diff --git a/pkg/controller/podupgrade/pod_upgrade_controller.go b/pkg/controller/podupgrade/pod_upgrade_controller.go new file mode 100644 index 00000000000..1902e021003 --- /dev/null +++ b/pkg/controller/podupgrade/pod_upgrade_controller.go @@ -0,0 +1,344 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podupgrade + +import ( + "context" + "fmt" + "reflect" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + appsinformers "k8s.io/client-go/informers/apps/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + client "k8s.io/client-go/kubernetes" + appslisters "k8s.io/client-go/listers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +type Controller struct { + kubeclientset client.Interface + + daemonsetLister appslisters.DaemonSetLister + daemonsetSynced cache.InformerSynced + nodeLister corelisters.NodeLister + nodeSynced cache.InformerSynced + podLister corelisters.PodLister + podSynced cache.InformerSynced + controllerRevisionLister appslisters.ControllerRevisionLister + controllerRevisionSynced cache.InformerSynced + + daemonsetWorkqueue workqueue.RateLimitingInterface + nodeWorkqueue workqueue.Interface +} + +func NewController(kc client.Interface, daemonsetInformer appsinformers.DaemonSetInformer, + nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer, + crInformer appsinformers.ControllerRevisionInformer) *Controller { + // TODO: Is eventBroadCaster needed? + ctrl := Controller{ + kubeclientset: kc, + daemonsetLister: daemonsetInformer.Lister(), + daemonsetSynced: daemonsetInformer.Informer().HasSynced, + + nodeLister: nodeInformer.Lister(), + nodeSynced: nodeInformer.Informer().HasSynced, + + podLister: podInformer.Lister(), + podSynced: podInformer.Informer().HasSynced, + + controllerRevisionLister: crInformer.Lister(), + controllerRevisionSynced: crInformer.Informer().HasSynced, + + daemonsetWorkqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + nodeWorkqueue: workqueue.New(), + } + + daemonsetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + newDS := new.(*appsv1.DaemonSet) + oldDS := old.(*appsv1.DaemonSet) + + // Only control daemonset with annotation "apps.openyurt.io/upgrade-strategy" + if !metav1.HasAnnotation(newDS.ObjectMeta, UpgradeAnnotation) || !metav1.HasAnnotation(oldDS.ObjectMeta, UpgradeAnnotation) { + return + } + + // TODO: change to compare generation and revision hash + if newDS.ResourceVersion == oldDS.ResourceVersion || reflect.DeepEqual(newDS.Spec.Template.Spec, oldDS.Spec.Template.Spec) { + return + } + + ctrl.enqueueDaemonset(new) + }, + }) + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + oldNode := old.(*corev1.Node) + newNode := new.(*corev1.Node) + if !NodeReady(&oldNode.Status) && NodeReady(&newNode.Status) { + klog.Infof("node %q turn to ready", newNode.Name) + ctrl.nodeWorkqueue.Add(newNode.Name) + } + }, + }) + return &ctrl +} + +func (c *Controller) enqueueDaemonset(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + + klog.Infof("Got udpate event: %v", key) + c.daemonsetWorkqueue.AddRateLimited(key) +} + +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + klog.Info("Starting pod upgrade controller") + defer klog.Info("Shutting down pod upgrade controller") + defer c.daemonsetWorkqueue.ShutDown() + defer c.nodeWorkqueue.ShutDown() + + //synchronize the cache before starting to process events + if !cache.WaitForCacheSync(stopCh, c.daemonsetSynced, c.nodeSynced, + c.podSynced, c.controllerRevisionSynced) { + klog.Error("sync podupgrade controller timeout") + } + + for i := 0; i < threadiness; i++ { + go wait.Until(c.runDaemonsetWorker, time.Second, stopCh) + } + + for i := 0; i < threadiness; i++ { + go wait.Until(c.runNodeWorker, time.Second, stopCh) + } + + <-stopCh +} + +func (c *Controller) runDaemonsetWorker() { + for { + obj, shutdown := c.daemonsetWorkqueue.Get() + if shutdown { + return + } + + if err := c.syncDaemonsetHandler(obj.(string)); err != nil { + utilruntime.HandleError(err) + } + c.daemonsetWorkqueue.Forget(obj) + c.daemonsetWorkqueue.Done(obj) + } +} + +func (c *Controller) runNodeWorker() { + for { + obj, shutdown := c.nodeWorkqueue.Get() + if shutdown { + return + } + + nodeName := obj.(string) + if err := c.syncNodeHandler(nodeName); err != nil { + utilruntime.HandleError(err) + } + + c.nodeWorkqueue.Done(nodeName) + } +} + +func (c *Controller) syncDaemonsetHandler(key string) error { + defer func() { + klog.V(4).Infof("Finish syncing pod upgrade request %q", key) + }() + + klog.V(4).Infof("Start handler pod upgrade request %q", key) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + // daemonset that need to be synced + ds, err := c.daemonsetLister.DaemonSets(namespace).Get(name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + pods, err := GetDaemonsetPods(c.podLister, ds) + if err != nil { + return err + } + + // recheck required annotation + v, ok := ds.Annotations[UpgradeAnnotation] + if !ok { + return fmt.Errorf("won't sync daemonset %q without annotation 'apps.openyurt.io/upgrade-strategy'", ds.Name) + } + + switch v { + case OTAUpgrade: + if err := c.checkOTAUpgrade(ds, pods); err != nil { + return err + } + + case AutoUpgrade: + if err := c.autoUpgrade(ds, pods); err != nil { + return err + } + default: + // error + return fmt.Errorf("unknown annotation type %v", v) + } + + return nil +} + +// checkOTAUpgrade compare every pod to its owner daemonset to check if pod is upgradable +// If pod is in line with the latest daemonset version, set annotation "apps.openyurt.io/pod-upgradable" to "true" +// while not, set annotation "apps.openyurt.io/pod-upgradable" to "false" +func (c *Controller) checkOTAUpgrade(ds *appsv1.DaemonSet, pods []*corev1.Pod) error { + for _, pod := range pods { + if err := SetPodUpgradeAnnotation(c.kubeclientset, c.controllerRevisionLister, ds, pod); err != nil { + return err + } + } + return nil +} + +// autoUpgrade perform pod upgrade operation when +// 1. pod is upgradable (using IsDaemonsetPodLatest to check) +// 2. pod node is ready +func (c *Controller) autoUpgrade(ds *appsv1.DaemonSet, pods []*corev1.Pod) error { + for _, pod := range pods { + latestOK, err := IsDaemonsetPodLatest(c.controllerRevisionLister, ds, pod) + if err != nil { + return err + } + + nodeOK, err := NodeReadyByName(c.kubeclientset, pod.Spec.NodeName) + if err != nil { + return err + } + + if !latestOK && nodeOK { + if err := c.kubeclientset.CoreV1().Pods(pod.Namespace). + Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil { + return err + } + klog.Infof("Auto upgrade pod %v/%v", ds.Name, pod.Name) + } + } + + return nil +} + +// syncNodeHandler delete the pod of daemonset that needs to be upgrade when node turns ready +func (c *Controller) syncNodeHandler(key string) error { + defer func() { + klog.V(4).Infof("Finish syncing pod upgrade request %q", key) + }() + + klog.V(4).Infof("Start handler pod upgrade request %q", key) + + node, err := c.nodeLister.Get(key) + if err != nil { + // If node not found, just ignore it. + if apierrors.IsNotFound(err) { + klog.V(5).Infof("Sync node %v not found", key) + return nil + } + return err + } + + // get all pod in this node + pods, err := GetNodePods(c.podLister, node) + if err != nil { + return err + } + + if err := c.upgradePodsWhenNodeReady(c.kubeclientset, pods); err != nil { + return err + } + + return nil +} + +// upgradePodsWhenNodeReady check pods in current node need to be upgraded +// Only workloads with annotation "apps.openyurt.io/upgrade-strategy" and updateStrategy "OnDelete" will be processed +func (c *Controller) upgradePodsWhenNodeReady(clientset client.Interface, pods []*corev1.Pod) error { + for _, pod := range pods { + owner := metav1.GetControllerOf(pod) + switch owner.Kind { + case DaemonSet: + ds, err := clientset.AppsV1().DaemonSets(pod.Namespace).Get(context.TODO(), owner.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + // consider only the case with annotation "apps.openyurt.io/upgrade-strategy" + v, ok := ds.Annotations[UpgradeAnnotation] + // consider only the case with updateStrategy "OnDelete" + updateStrategyOK := (ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType) + if ok && updateStrategyOK { + switch v { + // set pod annotation when "apps.openyurt.io/upgrade-strategy=ota" + case OTAUpgrade: + if err := SetPodUpgradeAnnotation(clientset, c.controllerRevisionLister, ds, pod); err != nil { + return err + } + + // auto upgrade pod when "apps.openyurt.io/upgrade-strategy=auto" + case AutoUpgrade: + latestOK, err := IsDaemonsetPodLatest(c.controllerRevisionLister, ds, pod) + if err != nil { + return err + } + if !latestOK { + if err := clientset.CoreV1().Pods(pod.Namespace). + Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil { + return err + } + klog.Infof("Auto upgrade pod %v/%v", ds.Name, pod.Name) + } + } + } + default: + continue + } + } + return nil +} diff --git a/pkg/controller/podupgrade/pod_upgrade_controller_test.go b/pkg/controller/podupgrade/pod_upgrade_controller_test.go new file mode 100644 index 00000000000..1b31d3a6e48 --- /dev/null +++ b/pkg/controller/podupgrade/pod_upgrade_controller_test.go @@ -0,0 +1,432 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podupgrade + +import ( + "context" + "encoding/binary" + "fmt" + "hash" + "hash/fnv" + "reflect" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apiserver/pkg/storage/names" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + ktest "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +var ( + simpleDaemonSetLabel = map[string]string{"foo": "bar"} + alwaysReady = func() bool { return true } +) + +var controllerKind = appsv1.SchemeGroupVersion.WithKind("DaemonSet") + +var ( + SyncDaemonset = "SyncDaemosnet" + SyncNode = "SyncNode" +) + +type fixture struct { + t *testing.T + client *fake.Clientset + podLister []*corev1.Pod + nodeLister []*corev1.Node + daemonsetLister []*appsv1.DaemonSet + crLister []*appsv1.ControllerRevision + actions []ktest.Action + objects []runtime.Object +} + +func newFixture(t *testing.T) *fixture { + return &fixture{ + t: t, + objects: []runtime.Object{}, + } +} + +func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, informers.SharedInformerFactory) { + f.client = fake.NewSimpleClientset(f.objects...) + + dsInformer := informers.NewSharedInformerFactory(f.client, 0) + nodeInformer := informers.NewSharedInformerFactory(f.client, 0) + podInformer := informers.NewSharedInformerFactory(f.client, 0) + crInformer := informers.NewSharedInformerFactory(f.client, 0) + + for _, d := range f.daemonsetLister { + dsInformer.Apps().V1().DaemonSets().Informer().GetIndexer().Add(d) + } + for _, n := range f.nodeLister { + nodeInformer.Core().V1().Nodes().Informer().GetIndexer().Add(n) + } + for _, p := range f.podLister { + podInformer.Core().V1().Pods().Informer().GetIndexer().Add(p) + } + for _, c := range f.crLister { + crInformer.Apps().V1().ControllerRevisions().Informer().GetIndexer().Add(c) + } + + c := NewController(f.client, dsInformer.Apps().V1().DaemonSets(), nodeInformer.Core().V1().Nodes(), + podInformer.Core().V1().Pods(), crInformer.Apps().V1().ControllerRevisions()) + + c.daemonsetSynced = alwaysReady + c.nodeSynced = alwaysReady + + return c, dsInformer, nodeInformer +} + +// run execute the controller logic +// kind=SyncDaemonset test daemonset update event +// kind=SyncNode test node ready event +func (f *fixture) run(key string, kind string) { + f.testController(key, kind, false) +} + +func (f *fixture) runExpectError(key string, kind string) { + f.testController(key, kind, true) +} + +func (f *fixture) testController(key string, kind string, expectError bool) { + c, dsInformer, nodeInformer := f.newController() + + stopCh := make(chan struct{}) + defer close(stopCh) + dsInformer.Start(stopCh) + nodeInformer.Start(stopCh) + + var err error + switch kind { + case SyncDaemonset: + err = c.syncDaemonsetHandler(key) + + case SyncNode: + err = c.syncNodeHandler(key) + } + + if !expectError && err != nil { + f.t.Errorf("error syncing: %v", err) + } else if expectError && err == nil { + f.t.Error("expected error syncing, got nil") + + } + + // make sure all the expected action occurred during controller sync process + for _, action := range f.actions { + findAndCheckAction(action, f.client.Actions(), f.t) + } +} + +// findAndCheckAction search all the given actions to find whether the expected action exists +func findAndCheckAction(expected ktest.Action, actions []ktest.Action, t *testing.T) { + for _, action := range actions { + if checkAction(expected, action) { + t.Logf("Check action %+v success", expected) + return + } + } + t.Errorf("Expected action %+v does not occur", expected) +} + +// checkAction verifies that expected and actual actions are equal +func checkAction(expected, actual ktest.Action) bool { + if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource)) || + reflect.TypeOf(actual) != reflect.TypeOf(expected) || + actual.GetSubresource() != expected.GetSubresource() { + return false + } + + switch a := actual.(type) { + case ktest.DeleteAction: + e, _ := expected.(ktest.DeleteActionImpl) + expName := e.GetName() + actualName := a.GetName() + if expName != actualName { + return false + } + } + + return true +} + +func (f *fixture) expectDeletePodAction(p *corev1.Pod) { + action := ktest.NewDeleteAction(schema.GroupVersionResource{Resource: "pods"}, p.Namespace, p.Name) + f.actions = append(f.actions, action) +} + +func newDaemonSet(name string, img string) *appsv1.DaemonSet { + two := int32(2) + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: appsv1.DaemonSetSpec{ + RevisionHistoryLimit: &two, + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.OnDeleteDaemonSetStrategyType, + }, + Selector: &metav1.LabelSelector{MatchLabels: simpleDaemonSetLabel}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: simpleDaemonSetLabel, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Image: img}}, + }, + }, + }, + } +} + +func newPod(podName string, nodeName string, label map[string]string, ds *appsv1.DaemonSet) *corev1.Pod { + // Add hash unique label to the pod + newLabels := label + var podSpec corev1.PodSpec + // Copy pod spec from DaemonSet template, or use a default one if DaemonSet is nil + if ds != nil { + hash := ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount) + newLabels = CloneAndAddLabel(label, appsv1.DefaultDaemonSetUniqueLabelKey, hash) + podSpec = ds.Spec.Template.Spec + } else { + podSpec = corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "foo/bar", + TerminationMessagePath: corev1.TerminationMessagePathDefault, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + } + } + + // Add node name to the pod + if len(nodeName) > 0 { + podSpec.NodeName = nodeName + } + + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: podName, + Labels: newLabels, + Namespace: metav1.NamespaceDefault, + }, + Spec: podSpec, + } + pod.Name = names.SimpleNameGenerator.GenerateName(podName) + if ds != nil { + pod.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(ds, controllerKind)} + } + return pod +} + +func newNode(name string) *corev1.Node { + return &corev1.Node{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceNone, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + {Type: corev1.NodeReady, Status: corev1.ConditionTrue}, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + } +} + +func newControllerRevision(name string, namespace string, label map[string]string, ds *appsv1.DaemonSet) *appsv1.ControllerRevision { + cr := &appsv1.ControllerRevision{ + TypeMeta: metav1.TypeMeta{APIVersion: "apps/v1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: label, + Namespace: namespace, + }, + } + if ds != nil { + cr.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(ds, controllerKind)} + } + return cr +} + +func setAutoUpgradeAnnotation(ds *appsv1.DaemonSet, annV string) { + metav1.SetMetaDataAnnotation(&ds.ObjectMeta, UpgradeAnnotation, annV) +} + +func getDaemonsetKey(ds *appsv1.DaemonSet, t *testing.T) string { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(ds) + if err != nil { + t.Errorf("Unexpected error getting key for foo %v: %v", ds.Name, err) + return "" + } + return key +} + +// ComputeHash returns a hash value calculated from pod template and +// a collisionCount to avoid hash collision. The hash will be safe encoded to +// avoid bad words. +func ComputeHash(template *corev1.PodTemplateSpec, collisionCount *int32) string { + podTemplateSpecHasher := fnv.New32a() + DeepHashObject(podTemplateSpecHasher, *template) + + // Add collisionCount in the hash if it exists. + if collisionCount != nil { + collisionCountBytes := make([]byte, 8) + binary.LittleEndian.PutUint32(collisionCountBytes, uint32(*collisionCount)) + podTemplateSpecHasher.Write(collisionCountBytes) + } + + return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32())) +} + +// DeepHashObject writes specified object to hash using the spew library +// which follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hasher, "%#v", objectToWrite) +} + +// Clones the given map and returns a new map with the given key and value added. +// Returns the given map, if labelKey is empty. +func CloneAndAddLabel(labels map[string]string, labelKey, labelValue string) map[string]string { + if labelKey == "" { + // Don't need to add a label. + return labels + } + // Clone. + newLabels := map[string]string{} + for key, value := range labels { + newLabels[key] = value + } + newLabels[labelKey] = labelValue + return newLabels +} + +func TestAutoUpgradeWithDaemonsetUpdate(t *testing.T) { + f := newFixture(t) + ds := newDaemonSet("test-ds", "foo/bar:v1") + setAutoUpgradeAnnotation(ds, "auto") + node := newNode("test-node") + pod := newPod("test-pod", "test-node", simpleDaemonSetLabel, ds) + + ds.Spec.Template.Spec.Containers[0].Image = "foo/bar:v2" + hash := ComputeHash(&ds.Spec.Template, nil) + label := CloneAndAddLabel(ds.Spec.Template.Labels, appsv1.DefaultDaemonSetUniqueLabelKey, hash) + cr := newControllerRevision("test-cr", ds.Namespace, label, ds) + + f.daemonsetLister = append(f.daemonsetLister, ds) + f.podLister = append(f.podLister, pod) + f.crLister = append(f.crLister, cr) + + f.objects = append(f.objects, ds, pod, node, cr) + + f.expectDeletePodAction(pod) + + f.run(getDaemonsetKey(ds, t), SyncDaemonset) +} + +func TestAutoUpgradeWithNodeReady(t *testing.T) { + f := newFixture(t) + ds := newDaemonSet("test-ds", "foo/bar") + setAutoUpgradeAnnotation(ds, "auto") + node := newNode("test-node") + pod := newPod("test-pod", "test-node", simpleDaemonSetLabel, ds) + + ds.Spec.Template.Spec.Containers[0].Image = "foo/bar:v2" + hash := ComputeHash(&ds.Spec.Template, nil) + label := CloneAndAddLabel(ds.Spec.Template.Labels, appsv1.DefaultDaemonSetUniqueLabelKey, hash) + cr := newControllerRevision("test-cr", ds.Namespace, label, ds) + + f.daemonsetLister = append(f.daemonsetLister, ds) + f.nodeLister = append(f.nodeLister, node) + f.podLister = append(f.podLister, pod) + f.crLister = append(f.crLister, cr) + + f.objects = append(f.objects, ds, pod, node, cr) + + f.expectDeletePodAction(pod) + + f.run(node.Name, SyncNode) +} + +func TestOTAUpgrade(t *testing.T) { + f := newFixture(t) + ds := newDaemonSet("test-ds", "foo/bar:v1") + setAutoUpgradeAnnotation(ds, "ota") + oldPod := newPod("old-pod", "test-node", simpleDaemonSetLabel, ds) + + hash := ComputeHash(&ds.Spec.Template, nil) + label := CloneAndAddLabel(ds.Spec.Template.Labels, appsv1.DefaultDaemonSetUniqueLabelKey, hash) + cr := newControllerRevision("test-cr", ds.Namespace, label, ds) + + ds.Spec.Template.Spec.Containers[0].Image = "foo/bar:v2" + newPod := newPod("new-pod", "test-node", simpleDaemonSetLabel, ds) + hash = ComputeHash(&ds.Spec.Template, nil) + label = CloneAndAddLabel(ds.Spec.Template.Labels, appsv1.DefaultDaemonSetUniqueLabelKey, hash) + cr2 := newControllerRevision("test-cr2", ds.Namespace, label, ds) + + f.daemonsetLister = append(f.daemonsetLister, ds) + f.podLister = append(f.podLister, oldPod, newPod) + f.crLister = append(f.crLister, cr, cr2) + f.objects = append(f.objects, ds, oldPod, newPod, cr, cr2) + + f.run(getDaemonsetKey(ds, t), SyncDaemonset) + + // check whether ota upgradable annotation set properly + oldPodGot, err := f.client.CoreV1().Pods(ds.Namespace).Get(context.TODO(), oldPod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("get oldPod failed, %+v", err) + } + + newPodGot, err := f.client.CoreV1().Pods(ds.Namespace).Get(context.TODO(), newPod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("get newPod failed, %+v", err) + } + + annOldPodGot, oldPodOK := oldPodGot.Annotations[PodUpgradableAnnotation] + assert.Equal(t, true, oldPodOK) + assert.Equal(t, "true", annOldPodGot) + + annNewPodGot, newPodOK := newPodGot.Annotations[PodUpgradableAnnotation] + assert.Equal(t, true, newPodOK) + assert.Equal(t, "false", annNewPodGot) +} diff --git a/pkg/controller/podupgrade/util.go b/pkg/controller/podupgrade/util.go new file mode 100644 index 00000000000..2315c32dad0 --- /dev/null +++ b/pkg/controller/podupgrade/util.go @@ -0,0 +1,222 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podupgrade + +import ( + "context" + "fmt" + "strconv" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + client "k8s.io/client-go/kubernetes" + appslisters "k8s.io/client-go/listers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" +) + +const ( + UpgradeAnnotation = "apps.openyurt.io/upgrade-strategy" + PodUpgradableAnnotation = "apps.openyurt.io/pod-upgradable" + + OTAUpgrade = "ota" + AutoUpgrade = "auto" + + DaemonSet = "DaemonSet" +) + +// GetDaemonsetPods get all pods belong to the given daemonset +func GetDaemonsetPods(podLister corelisters.PodLister, ds *appsv1.DaemonSet) ([]*corev1.Pod, error) { + dsPods := make([]*corev1.Pod, 0) + dsPodsNames := make([]string, 0) + pods, err := podLister.Pods(ds.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + for i, pod := range pods { + owner := metav1.GetControllerOf(pod) + if owner.UID == ds.UID { + dsPods = append(dsPods, pods[i]) + dsPodsNames = append(dsPodsNames, pod.Name) + } + } + + if len(dsPods) > 0 { + klog.V(5).Infof("Daemonset %v has pods %v", ds.Name, dsPodsNames) + } + return dsPods, nil +} + +// GetDaemonsetPods get all pods belong to the given daemonset +func GetNodePods(podLister corelisters.PodLister, node *corev1.Node) ([]*corev1.Pod, error) { + nodePods := make([]*corev1.Pod, 0) + nodePodsNames := make([]string, 0) + + pods, err := podLister.List(labels.Everything()) + if err != nil { + return nil, err + } + + for i, pod := range pods { + if pod.Spec.NodeName == node.Name { + nodePods = append(nodePods, pods[i]) + nodePodsNames = append(nodePodsNames, pod.Name) + } + } + + if len(nodePodsNames) > 0 { + klog.V(5).Infof("Daemonset %v has pods %v", node.Name, nodePodsNames) + } + return nodePods, nil +} + +// IsDaemonsetPodLatest check whether pod is latest by comparing its Spec with daemonset's +// If pod is latest, return true, otherwise return false +func IsDaemonsetPodLatest(crLister appslisters.ControllerRevisionLister, ds *appsv1.DaemonSet, pod *corev1.Pod) (bool, error) { + historyRevision, err := GetDaemonsetLatestControllerRevision(crLister, ds) + if err != nil { + return false, err + } + if historyRevision == nil { + return false, fmt.Errorf("daemonset %v does not have any controller revision", ds.Name) + } + + // Using "controller-revision-hash" and "deprecated.daemonset.template.generation" to check if upgrade is available + hash := historyRevision.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] + generation, err := GetTemplateGeneration(ds) + if err != nil { + return false, err + } + + klog.V(5).Infof("daemonset %v revision hash is %v", ds.Name, hash) + klog.V(5).Infof("daemonset %v generation is %v", ds.Name, generation) + + templateMatches := generation != nil && pod.Labels[extensions.DaemonSetTemplateGenerationKey] == fmt.Sprint(generation) + hashMatches := len(hash) > 0 && pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey] == hash + return hashMatches || templateMatches, nil +} + +// GetDaemonsetLatestControllerRevision get the latest controller revision of the given daemonset +func GetDaemonsetLatestControllerRevision(crLister appslisters.ControllerRevisionLister, ds *v1.DaemonSet) (*appsv1.ControllerRevision, error) { + histories, err := GetDaemonsetControllerRevisions(crLister, ds) + if err != nil { + return nil, err + } + if len(histories) <= 0 { + return nil, nil + } + // sort.Stable(byRevision(histories)) + return histories[len(histories)-1], nil +} + +// GetDaemonsetControllerRevisions get all controller revisions of the given daemonset +func GetDaemonsetControllerRevisions(crLister appslisters.ControllerRevisionLister, ds *v1.DaemonSet) ( + []*appsv1.ControllerRevision, error) { + histories, err := crLister.ControllerRevisions(ds.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + var historyRevisions []*appsv1.ControllerRevision + for i, h := range histories { + owner := metav1.GetControllerOf(h) + if owner.UID == ds.UID { + historyRevisions = append(historyRevisions, histories[i]) + } + } + + return historyRevisions, nil +} + +// GetTemplateGeneration get annotation "deprecated.daemonset.template.generation" of the given daemonset +func GetTemplateGeneration(ds *appsv1.DaemonSet) (*int64, error) { + annotation, found := ds.Annotations[appsv1.DeprecatedTemplateGeneration] + if !found { + return nil, nil + } + generation, err := strconv.ParseInt(annotation, 10, 64) + if err != nil { + return nil, err + } + return &generation, nil +} + +func NodeReadyByName(client client.Interface, nodeName string) (bool, error) { + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return false, err + } + + return NodeReady(&node.Status), nil +} + +// NodeReady check if the given node is ready +func NodeReady(nodeStatus *corev1.NodeStatus) bool { + for _, cond := range nodeStatus.Conditions { + if cond.Type == corev1.NodeReady { + return cond.Status == corev1.ConditionTrue + } + } + return false +} + +func SetPodUpgradeAnnotation(clientset client.Interface, crLister appslisters.ControllerRevisionLister, ds *appsv1.DaemonSet, pod *corev1.Pod) error { + ok, err := IsDaemonsetPodLatest(crLister, ds, pod) + if err != nil { + return err + } + + var res bool + if !ok { + res = true + } + + metav1.SetMetaDataAnnotation(&pod.ObjectMeta, PodUpgradableAnnotation, strconv.FormatBool(res)) + if _, err := clientset.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}); err != nil { + return err + } + klog.Infof("set pod %q annotation apps.openyurt.io/pod-upgradable to %v", pod.Name, res) + + return nil +} + +// byRevision implements sort.Interface to allow ControllerRevisions to be sorted by Revision. +type byRevision []*appsv1.ControllerRevision + +func (br byRevision) Len() int { + return len(br) +} + +// Less breaks ties first by creation timestamp, then by name +func (br byRevision) Less(i, j int) bool { + if br[i].Revision == br[j].Revision { + if br[j].CreationTimestamp.Equal(&br[i].CreationTimestamp) { + return br[i].Name < br[j].Name + } + return br[j].CreationTimestamp.After(br[i].CreationTimestamp.Time) + } + return br[i].Revision < br[j].Revision +} + +func (br byRevision) Swap(i, j int) { + br[i], br[j] = br[j], br[i] +} diff --git a/pkg/controller/podupgrade/util_test.go b/pkg/controller/podupgrade/util_test.go new file mode 100644 index 00000000000..d1a38a4c4b5 --- /dev/null +++ b/pkg/controller/podupgrade/util_test.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podupgrade + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +func TestGetNodePods(t *testing.T) { + // Note: fake client does not support filtering by field selector, just ignore + node := newNode("test-node") + pod1 := newPod("test-pod1", "test-node", simpleDaemonSetLabel, nil) + pod2 := newPod("test-pod2", "test-node", simpleDaemonSetLabel, nil) + + expectPods := []*corev1.Pod{pod1, pod2} + clientset := fake.NewSimpleClientset(node, pod1, pod2) + podInformer := informers.NewSharedInformerFactory(clientset, 0) + + podInformer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + podInformer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + gotPods, err := GetNodePods(podInformer.Core().V1().Pods().Lister(), node) + + assert.Equal(t, nil, err) + assert.Equal(t, expectPods, gotPods) +}