Skip to content

Commit

Permalink
verify upgrade status by worker pod
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <[email protected]>
  • Loading branch information
xavier-hou committed Apr 2, 2023
1 parent 6e16954 commit 4900806
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 156 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/apps/v1alpha1/staticpod_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type StaticPodStatus struct {
// The total number of nodes that are running the static pod.
TotalNumber int32 `json:"totalNumber"`

// The number of nodes that are running static pods which need to be upgraded.
DesiredNumber int32 `json:"desiredNumber"`
// The number of nodes that are running ready static pod.
ReadyNumber int32 `json:"readyNumber"`

// The number of nodes that are running updated static pod.
UpgradedNumber int32 `json:"upgradedNumber"`
Expand Down
187 changes: 53 additions & 134 deletions pkg/controller/staticpod/staticpod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"flag"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
kerr "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -75,7 +74,7 @@ const (
UpgradeWorkerPodPrefix = "yurt-static-pod-upgrade-worker-"
UpgradeWorkerContainerName = "upgrade-worker"

ArgTmpl = "/usr/local/bin/yurt-static-pod-upgrade --manifest=%s --mode=%s"
ArgTmpl = "/usr/local/bin/node-servant static-pod-upgrade --name=%s --namespace=%s --manifest=%s --hash=%s --mode=%s"
)

// upgradeWorker is the pod template used for static pod upgrade
Expand Down Expand Up @@ -254,7 +253,7 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ
// Fetch the StaticPod instance
instance := &appsv1alpha1.StaticPod{}
if err := r.Get(context.TODO(), request.NamespacedName, instance); err != nil {
klog.Errorf("Fail to get StaticPod %v, %v", request.NamespacedName.Name, err)
klog.Errorf("Fail to get StaticPod %v, %v", request.NamespacedName, err)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand All @@ -265,12 +264,6 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ
var (
// totalNumber represents the total number of nodes running the target static pod
totalNumber int32

// desiredNumber represents the desired upgraded number of nodes running the target static pod
// In auto mode: it's the number of ready nodes running the target static pod
// In ota mode: it's equal to totalNumber
desiredNumber int32

// upgradedNumber represents the number of nodes that have been upgraded
upgradedNumber int32
)
Expand All @@ -279,14 +272,14 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ
upgradeInfos, err := upgradeinfo.New(r.Client, instance, UpgradeWorkerPodPrefix)
if err != nil {
klog.Errorf(Format("Fail to get static pod and worker pod upgrade info for nodes of StaticPod %v, %v",
request.NamespacedName.Name, err))
request.NamespacedName, err))
return ctrl.Result{}, err
}
totalNumber = int32(len(upgradeInfos))
// There are no nodes running target static pods in the cluster
if totalNumber == 0 {
klog.Infof(Format("No static pods need to be upgraded of StaticPod %v", request.NamespacedName.Name))
return r.updateStaticPodStatus(instance, totalNumber, totalNumber, totalNumber)
klog.Infof(Format("No static pods need to be upgraded of StaticPod %v", request.NamespacedName))
return r.updateStaticPodStatus(instance, totalNumber, totalNumber)
}

// The latest hash value for static pod spec
Expand All @@ -300,103 +293,100 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ
// The above hash value will be added to the annotation
latestManifest, err := util.GenStaticPodManifest(&instance.Spec.Template, latestHash)
if err != nil {
klog.Errorf(Format("Fail to generate static pod manifest of StaticPod %v, %v", request.NamespacedName.Name, err))
klog.Errorf(Format("Fail to generate static pod manifest of StaticPod %v, %v", request.NamespacedName, err))
return ctrl.Result{}, err
}

// Sync the corresponding configmap to the latest state
if err := r.syncConfigMap(instance, latestHash, latestManifest); err != nil {
klog.Errorf(Format("Fail to sync the corresponding configmap of StaticPod %v, %v", request.NamespacedName.Name, err))
klog.Errorf(Format("Fail to sync the corresponding configmap of StaticPod %v, %v", request.NamespacedName, err))
return ctrl.Result{}, err
}

// Complete upgrade info
{
// Count the number of upgraded nodes
upgradedNumber = setUpgradeNeededInfos(upgradeInfos, latestHash)
upgradedNumber = upgradeinfo.SetUpgradeNeededInfos(upgradeInfos, latestHash)

// Set node ready info
if err := checkReadyNodes(r.Client, upgradeInfos); err != nil {
klog.Errorf(Format("Fail to check node ready status of StaticPod %v,%v", request.NamespacedName.Name, err))
klog.Errorf(Format("Fail to check node ready status of StaticPod %v,%v", request.NamespacedName, err))
return ctrl.Result{}, err
}
}

// Sync worker pods
var allSucceeded bool
allSucceeded := true
deletePods := make([]*corev1.Pod, 0)
succeededNodes := make([]string, 0)
{
// Deal with worker pods
allSucceeded, err = r.checkWorkerPods(upgradeInfos, latestHash, &deletePods, &succeededNodes)
if err != nil {
klog.Errorf(Format("Fail to continue upgrade, cause worker pod of StaticPod %v in node %v failed",
request.NamespacedName.Name, err.Error()))
return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber)
}

// Verify succeededNodes
if instance.Spec.UpgradeStrategy.Type == appsv1alpha1.AutoStaticPodUpgradeStrategyType {
ok, err := r.verifySucceededNodes(instance, succeededNodes, latestHash)
if util.IsFailedError(err) {
klog.Errorf(Format("Fail to verify succeededNodes of StaticPod %v, %v", request.NamespacedName.Name, err))
return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber)
for node, info := range upgradeInfos {
if info.WorkerPod == nil {
continue
}

if err != nil {
klog.Errorf(Format("Fail to verify succeededNodes of StaticPod %v, %v", request.NamespacedName.Name, err))
return reconcile.Result{}, err
hash := info.WorkerPod.Annotations[StaticPodHashAnnotation]
// If the worker pod is not up-to-date, then it can be recreated directly
if hash != latestHash {
deletePods = append(deletePods, info.WorkerPod)
continue
}

if !ok {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
// If the worker pod is up-to-date, there are three possible situations
// 1. The worker pod is failed, then some irreparable failure has occurred. Just stop reconcile and update status
// 2. The worker pod is succeeded, then this node must be up-to-date. Just delete this worker pod
// 3. The worker pod is running, pending or unknown, then just wait
switch info.WorkerPod.Status.Phase {
case corev1.PodFailed:
klog.Errorf("Fail to continue upgrade, cause worker pod %s of StaticPod %v in node %s failed",
info.WorkerPod.Name, request.NamespacedName, node)
return reconcile.Result{},
fmt.Errorf("fail to continue upgrade, cause worker pod %s of StaticPod %v in node %s failed",
info.WorkerPod.Name, request.NamespacedName, node)
case corev1.PodSucceeded:
deletePods = append(deletePods, info.WorkerPod)
default:
// In this node, the latest worker pod is still running, and we don't need to create new worker for it.
info.WorkerPodRunning = true
allSucceeded = false
}
}
}

// Clean up unused pods
if err := r.removeUnusedPods(deletePods); err != nil {
klog.Errorf(Format("Fail to remove unused pods of StaticPod %v, %v", request.NamespacedName.Name, err))
klog.Errorf(Format("Fail to remove unused pods of StaticPod %v, %v", request.NamespacedName, err))
return reconcile.Result{}, err
}

// If all nodes have been upgraded, just return
// Put this here because we need to clean up the worker pods first
if totalNumber == upgradedNumber {
klog.Infof(Format("All static pods have been upgraded of StaticPod %v", request.NamespacedName.Name))
return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber)
klog.Infof(Format("All static pods have been upgraded of StaticPod %v", request.NamespacedName))
return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber)
}

switch instance.Spec.UpgradeStrategy.Type {
// Auto Upgrade is to automate the upgrade process for the target static pods on ready nodes
// It supports rolling update and the max-unavailable number can be specified by users
case appsv1alpha1.AutoStaticPodUpgradeStrategyType:
// In auto upgrade mode, desiredNumber is the number of ready nodes
desiredNumber = int32(len(upgradeinfo.ReadyNodes(upgradeInfos)))
// This means that all the desired nodes are upgraded. It's considered successful.
if desiredNumber == upgradedNumber {
return r.updateStaticPodStatus(instance, totalNumber, desiredNumber, upgradedNumber)
}

if !allSucceeded {
klog.V(5).Infof(Format("Wait last round auto upgrade to finish of StaticPod %v", request.NamespacedName.Name))
return r.updateStaticPodStatus(instance, totalNumber, desiredNumber, upgradedNumber)
klog.V(5).Infof(Format("Wait last round auto upgrade to finish of StaticPod %v", request.NamespacedName))
return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber)
}

if err := r.autoUpgrade(instance, upgradeInfos, latestHash); err != nil {
klog.Errorf(Format("Fail to auto upgrade of StaticPod %v, %v", request.NamespacedName.Name, err))
klog.Errorf(Format("Fail to auto upgrade of StaticPod %v, %v", request.NamespacedName, err))
return ctrl.Result{}, err
}
return r.updateStaticPodStatus(instance, totalNumber, desiredNumber, upgradedNumber)
return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber)

// OTA Upgrade can help users control the timing of static pods upgrade
// It will set PodNeedUpgrade condition and work with YurtHub component
case appsv1alpha1.OTAStaticPodUpgradeStrategyType:
if err := r.otaUpgrade(instance, upgradeInfos, latestHash); err != nil {
klog.Errorf(Format("Fail to ota upgrade of StaticPod %v, %v", request.NamespacedName.Name, err))
klog.Errorf(Format("Fail to ota upgrade of StaticPod %v, %v", request.NamespacedName, err))
return ctrl.Result{}, err
}
return r.updateStaticPodStatus(instance, totalNumber, totalNumber, upgradedNumber)
return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber)
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -443,62 +433,6 @@ func (r *ReconcileStaticPod) syncConfigMap(instance *appsv1alpha1.StaticPod, has
return nil
}

// syncWorkerPod synchronizes the worker pods for each node which has
func (r *ReconcileStaticPod) checkWorkerPods(infos map[string]*upgradeinfo.UpgradeInfo, latestHash string,
deletePods *[]*corev1.Pod, succeededNodes *[]string) (bool, error) {
allSucceeded := true

for node, info := range infos {
if info.WorkerPod == nil {
continue
}

hash := info.WorkerPod.Annotations[StaticPodHashAnnotation]
// If the worker pod is not up-to-date, then it can be recreated directly
if hash != latestHash {
*deletePods = append(*deletePods, info.WorkerPod)
continue
}
// If the worker pod is up-to-date, there are three possible situations
// 1. The worker pod is failed, then some irreparable failure has occurred. Just stop reconcile and update status
// 2. The worker pod is succeeded, then this node must be up-to-date. Just delete this worker pod
// 3. The worker pod is running, pending or unknown, then just wait
switch info.WorkerPod.Status.Phase {
case corev1.PodFailed:
return false, util.NewFailedError(node)
case corev1.PodSucceeded:
*succeededNodes = append(*succeededNodes, node)
*deletePods = append(*deletePods, info.WorkerPod)
default:
// In this node, the latest worker pod is still running, and we don't need to create new worker for it.
info.WorkerPodRunning = true
allSucceeded = false
}
}

return allSucceeded, nil
}

// verifyUpgrade verify that whether the new static pods on the given nodes are ready
func (r *ReconcileStaticPod) verifySucceededNodes(instance *appsv1alpha1.StaticPod, nodes []string, hash string) (bool, error) {
pod := &corev1.Pod{}
for _, node := range nodes {
if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: util.Hyphen(instance.Name, node)}, pod); err != nil {
return false, err
}

if pod.Status.Phase == corev1.PodFailed {
return false, util.NewFailedError(node)
}

if pod.Status.Phase != corev1.PodRunning || pod.Annotations[StaticPodHashAnnotation] != hash {
klog.V(5).Infof("Fail to verify new static pod on %v", node)
return false, nil
}
}
return true, nil
}

// autoUpgrade automatically rolling upgrade the target static pods in cluster
func (r *ReconcileStaticPod) autoUpgrade(instance *appsv1alpha1.StaticPod, infos map[string]*upgradeinfo.UpgradeInfo, hash string) error {
// readyUpgradeWaitingNodes represents nodes that need to create worker pods
Expand All @@ -520,7 +454,8 @@ func (r *ReconcileStaticPod) autoUpgrade(instance *appsv1alpha1.StaticPod, infos
}

readyUpgradeWaitingNodes = readyUpgradeWaitingNodes[:max]
if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash, string(appsv1alpha1.AutoStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil {
if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash,
string(appsv1alpha1.AutoStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil {
return err
}
return nil
Expand All @@ -547,7 +482,8 @@ func (r *ReconcileStaticPod) otaUpgrade(instance *appsv1alpha1.StaticPod, infos

// Create worker pod to issue the latest manifest to ready node
readyUpgradeWaitingNodes := upgradeinfo.OTAReadyUpgradeWaitingNodes(infos, hash)
if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash, string(appsv1alpha1.OTAStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil {
if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash,
string(appsv1alpha1.OTAStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil {
return err
}

Expand All @@ -560,7 +496,7 @@ func (r *ReconcileStaticPod) otaUpgrade(instance *appsv1alpha1.StaticPod, infos

// setLatestManifestHash set the latest manifest hash value to target static pod annotation
// TODO: In ota mode, it's hard for controller to check whether the latest manifest file has been issued to nodes
// TODO: Use annotation `openyurt.io/ota-manifest-version` to indicate the version of manifest issued to nodes
// TODO: Use annotation `openyurt.io/ota-latest-version` to indicate the version of manifest issued to nodes
func (r *ReconcileStaticPod) setLatestManifestHash(instance *appsv1alpha1.StaticPod, nodes []string, hash string) error {
pod := &corev1.Pod{}
for _, node := range nodes {
Expand Down Expand Up @@ -606,7 +542,8 @@ func createUpgradeWorker(c client.Client, instance *appsv1alpha1.StaticPod, node
},
},
})
pod.Spec.Containers[0].Args = []string{fmt.Sprintf(ArgTmpl, instance.Spec.StaticPodManifest, mode)}
pod.Spec.Containers[0].Args = []string{fmt.Sprintf(ArgTmpl, util.Hyphen(instance.Name, node), instance.Namespace,
instance.Spec.StaticPodManifest, hash, mode)}
pod.Spec.Containers[0].Image = img
if err := controllerutil.SetControllerReference(instance, pod, c.Scheme()); err != nil {
return err
Expand All @@ -621,24 +558,6 @@ func createUpgradeWorker(c client.Client, instance *appsv1alpha1.StaticPod, node
return nil
}

// setUpgradeNeededInfo sets `UpgradeNeeded` flag and counts the number of upgraded nodes
func setUpgradeNeededInfos(infos map[string]*upgradeinfo.UpgradeInfo, latestHash string) int32 {
var upgradedNumber int32

for _, info := range infos {
if info.StaticPod != nil {
if info.StaticPod.Annotations[StaticPodHashAnnotation] != latestHash {
// Indicate the static pod in this node needs to be upgraded
info.UpgradeNeeded = true
continue
}
upgradedNumber++
}
}

return upgradedNumber
}

// checkReadyNodes checks and sets the ready status for every node which has the target static pod
func checkReadyNodes(client client.Client, infos map[string]*upgradeinfo.UpgradeInfo) error {
for node, info := range infos {
Expand All @@ -652,9 +571,9 @@ func checkReadyNodes(client client.Client, infos map[string]*upgradeinfo.Upgrade
}

// updateStatus set the status of instance to the given values
func (r *ReconcileStaticPod) updateStaticPodStatus(instance *appsv1alpha1.StaticPod, totalNum, desiredNum, upgradedNum int32) (reconcile.Result, error) {
func (r *ReconcileStaticPod) updateStaticPodStatus(instance *appsv1alpha1.StaticPod, totalNum, upgradedNum int32) (reconcile.Result, error) {
instance.Status.TotalNumber = totalNum
instance.Status.DesiredNumber = desiredNum
instance.Status.ReadyNumber = upgradedNum
instance.Status.UpgradedNumber = upgradedNum

if err := r.Client.Status().Update(context.TODO(), instance); err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/controller/staticpod/upgradeinfo/upgrade_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

const (
StaticPodHashAnnotation = "openyurt.io/static-pod-hash"
OTALatestManifestAnnotation = "openyurt.io/ota-latest-version"
)

Expand Down Expand Up @@ -171,3 +172,21 @@ func UpgradedNodes(infos map[string]*UpgradeInfo) []string {
}
return nodes
}

// SetUpgradeNeededInfo sets `UpgradeNeeded` flag and counts the number of upgraded nodes
func SetUpgradeNeededInfos(infos map[string]*UpgradeInfo, latestHash string) int32 {
var upgradedNumber int32

for _, info := range infos {
if info.StaticPod != nil {
if info.StaticPod.Annotations[StaticPodHashAnnotation] != latestHash {
// Indicate the static pod in this node needs to be upgraded
info.UpgradeNeeded = true
continue
}
upgradedNumber++
}
}

return upgradedNumber
}
Loading

0 comments on commit 4900806

Please sign in to comment.