Skip to content

Commit

Permalink
add unit tests for k8s utils
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <[email protected]>
  • Loading branch information
xavier-hou committed Sep 9, 2022
1 parent 22e2fef commit 16979d8
Show file tree
Hide file tree
Showing 12 changed files with 560 additions and 240 deletions.
2 changes: 1 addition & 1 deletion cmd/yurt-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func NewControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["nodelifecycle"] = startNodeLifecycleController
controllers["yurtcsrapprover"] = startYurtCSRApproverController
controllers["podupgrade"] = startPodUpgradeController
controllers["daemonpodupdater"] = startDaemonPodUpdaterController
return controllers
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/yurt-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"time"

"github.com/openyurtio/openyurt/pkg/controller/certificates"
daemonpodupdater "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
lifecyclecontroller "github.com/openyurtio/openyurt/pkg/controller/nodelifecycle"
podupdater "github.com/openyurtio/openyurt/pkg/controller/podupdater"
)

func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
Expand Down Expand Up @@ -67,15 +67,15 @@ func startYurtCSRApproverController(ctx ControllerContext) (http.Handler, bool,
return nil, true, nil
}

func startPodUpgradeController(ctx ControllerContext) (http.Handler, bool, error) {
podUpgradeCtrl := podupdater.NewController(
func startDaemonPodUpdaterController(ctx ControllerContext) (http.Handler, bool, error) {
daemonPodUpdaterCtrl := daemonpodupdater.NewController(
ctx.ClientBuilder.ClientOrDie("podUpgrade-controller"),
ctx.InformerFactory.Apps().V1().DaemonSets(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().Pods(),
)

go podUpgradeCtrl.Run(2, ctx.Stop)
go daemonPodUpdaterCtrl.Run(2, ctx.Stop)

return nil, true, nil
}
3 changes: 0 additions & 3 deletions cmd/yurt-controller-manager/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ import (
"time"

"k8s.io/component-base/logs"

// for JSON log format registration
_ "k8s.io/component-base/logs/json/register"

// load all the prometheus client-go plugin
_ "k8s.io/component-base/metrics/prometheus/clientgo"

// for version metric registration
_ "k8s.io/component-base/metrics/prometheus/version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package podupdater
package daemonpodupdater

import (
"context"
Expand Down Expand Up @@ -42,26 +42,26 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

k8sutil "github.com/openyurtio/openyurt/pkg/controller/podupdater/kubernetes"
k8sutil "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater/kubernetes"
)

const (
// UpgradeAnnotation is the annotation key used in daemonset spec to indicate
// which upgrade strategy is selected. Currently "ota" and "auto" are supported.
UpgradeAnnotation = "apps.openyurt.io/upgrade-strategy"
// UpdateAnnotation is the annotation key used in daemonset spec to indicate
// which update strategy is selected. Currently, "ota" and "auto" are supported.
UpdateAnnotation = "apps.openyurt.io/update-strategy"

OTAUpgrade = "ota"
AutoUpgrade = "auto"
)
OTAUpdate = "ota"
AutoUpdate = "auto"

// PodUpgradableAnnotation is the annotation key added to pods to indicate
// whether a new version is available for upgrade.
// This annotation will only be added if the upgrade strategy is "apps.openyurt.io/upgrade-strategy":"ota".
const PodUpgradableAnnotation = "apps.openyurt.io/pod-upgradable"
// PodUpdatableAnnotation is the annotation key added to pods to indicate
// whether a new version is available for update.
// This annotation will only be added if the update strategy is "apps.openyurt.io/update-strategy":"ota".
PodUpdatableAnnotation = "apps.openyurt.io/pod-updatable"

const MaxUnavailableAnnotation = "apps.openyurt.io/max-unavailable"
// MaxUnavailableAnnotation is the annotation key added to daemonset to indicate
// the max unavailable pods number. It's used with "apps.openyurt.io/update-strategy=auto".
MaxUnavailableAnnotation = "apps.openyurt.io/max-unavailable"

const (
// BurstReplicas is a rate limiter for booting pods on a lot of pods.
// The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
BurstReplicas = 250
Expand Down Expand Up @@ -143,7 +143,7 @@ func NewController(kc client.Interface, daemonsetInformer appsinformers.DaemonSe
func (c *Controller) enqueueDaemonSet(ds *appsv1.DaemonSet) {
key, err := cache.MetaNamespaceKeyFunc(ds)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ds, err))
return
}

Expand All @@ -153,7 +153,7 @@ func (c *Controller) enqueueDaemonSet(ds *appsv1.DaemonSet) {

func (c *Controller) deletePod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// When a deletion is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new daemonset will not be woken up till the periodic
Expand Down Expand Up @@ -220,14 +220,14 @@ func (c *Controller) resolveControllerRef(namespace string, controllerRef *metav
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

klog.Info("Starting pod upgrade controller")
defer klog.Info("Shutting down pod upgrade controller")
klog.Info("Starting daemonPodUpdater controller")
defer klog.Info("Shutting down daemonPodUpdater controller")
defer c.daemonsetWorkqueue.ShutDown()

//synchronize the cache before starting to process events
if !cache.WaitForCacheSync(stopCh, c.daemonsetSynced, c.nodeSynced,
c.podSynced) {
klog.Error("sync podupgrade controller timeout")
klog.Error("sync daemonPodUpdater controller timeout")
}

for i := 0; i < threadiness; i++ {
Expand All @@ -253,10 +253,10 @@ func (c *Controller) runDaemonsetWorker() {

func (c *Controller) syncDaemonsetHandler(key string) error {
defer func() {
klog.V(4).Infof("Finish syncing pod upgrade request %q", key)
klog.V(4).Infof("Finish syncing daemonPodUpdater request %q", key)
}()

klog.V(4).Infof("Start handler pod upgrade request %q", key)
klog.V(4).Infof("Start handler daemonPodUpdater request %q", key)

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down Expand Up @@ -290,19 +290,19 @@ func (c *Controller) syncDaemonsetHandler(key string) error {
}

// recheck required annotation
v, ok := ds.Annotations[UpgradeAnnotation]
v, ok := ds.Annotations[UpdateAnnotation]
if !ok {
return fmt.Errorf("won't sync daemonset %q without annotation 'apps.openyurt.io/upgrade-strategy'", ds.Name)
return fmt.Errorf("won't sync daemonset %q without annotation 'apps.openyurt.io/update-strategy'", ds.Name)
}

switch v {
case OTAUpgrade:
if err := c.checkOTAUpgrade(ds, pods); err != nil {
case OTAUpdate:
if err := c.checkOTAUpdate(ds, pods); err != nil {
return err
}

case AutoUpgrade:
if err := c.autoUpgrade(ds); err != nil {
case AutoUpdate:
if err := c.autoUpdate(ds); err != nil {
return err
}
default:
Expand All @@ -312,20 +312,20 @@ func (c *Controller) syncDaemonsetHandler(key string) error {
return nil
}

// checkOTAUpgrade compare every pod to its owner daemonset to check if pod is upgradable
// If pod is in line with the latest daemonset version, set annotation "apps.openyurt.io/pod-upgradable" to "true"
// while not, set annotation "apps.openyurt.io/pod-upgradable" to "false"
func (c *Controller) checkOTAUpgrade(ds *appsv1.DaemonSet, pods []*corev1.Pod) error {
// checkOTAUpdate compare every pod to its owner daemonset to check if pod is updatable
// If pod is in line with the latest daemonset version, set annotation "apps.openyurt.io/pod-updatable" to "true"
// while not, set annotation "apps.openyurt.io/pod-updatable" to "false"
func (c *Controller) checkOTAUpdate(ds *appsv1.DaemonSet, pods []*corev1.Pod) error {
for _, pod := range pods {
if err := SetPodUpgradeAnnotation(c.kubeclientset, ds, pod); err != nil {
if err := SetPodUpdateAnnotation(c.kubeclientset, ds, pod); err != nil {
return err
}
}
return nil
}

// autoUpgrade identifies the set of old pods to delete
func (c *Controller) autoUpgrade(ds *appsv1.DaemonSet) error {
// autoUpdate identifies the set of old pods to delete
func (c *Controller) autoUpdate(ds *appsv1.DaemonSet) error {
nodeToDaemonPods, err := c.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
Expand Down Expand Up @@ -363,7 +363,7 @@ func (c *Controller) autoUpgrade(ds *appsv1.DaemonSet) error {
// the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
numUnavailable++
case newPod != nil:
// this pod is up to date, check its availability
// this pod is up-to-date, check its availability
if !k8sutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: time.Now()}) {
// an unavailable new pod is counted against maxUnavailable
numUnavailable++
Expand Down Expand Up @@ -416,7 +416,7 @@ func (c *Controller) getNodesToDaemonPods(ds *appsv1.DaemonSet) (map[string][]*c
// Group Pods by Node name.
nodeToDaemonPods := make(map[string][]*corev1.Pod)
for _, pod := range pods {
nodeName, err := k8sutil.GetTargetNodeName(pod)
nodeName, err := GetTargetNodeName(pod)
if err != nil {
klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
pod.Namespace, pod.Name, ds.Namespace, ds.Name)
Expand Down Expand Up @@ -464,7 +464,7 @@ func (c *Controller) syncPodsOnNodes(ds *appsv1.DaemonSet, podsToDelete []string
utilruntime.HandleError(err)
}
}
klog.Infof("Auto upgrade pod %v/%v", ds.Name, podsToDelete[ix])
klog.Infof("Auto update pod %v/%v", ds.Name, podsToDelete[ix])
}(i)
}
deleteWait.Wait()
Expand Down
Loading

0 comments on commit 16979d8

Please sign in to comment.