diff --git a/pkg/apis/apps/v1alpha1/staticpod_types.go b/pkg/apis/apps/v1alpha1/staticpod_types.go index dc62153238c..a8185562539 100644 --- a/pkg/apis/apps/v1alpha1/staticpod_types.go +++ b/pkg/apis/apps/v1alpha1/staticpod_types.go @@ -66,8 +66,8 @@ type StaticPodStatus struct { // The total number of nodes that are running the static pod. TotalNumber int32 `json:"totalNumber"` - // The number of nodes that are running static pods which need to be upgraded. - DesiredNumber int32 `json:"desiredNumber"` + // The number of nodes that are running ready static pod. + ReadyNumber int32 `json:"readyNumber"` // The number of nodes that are running updated static pod. UpgradedNumber int32 `json:"upgradedNumber"` diff --git a/pkg/controller/staticpod/staticpod_controller.go b/pkg/controller/staticpod/staticpod_controller.go index b98ba144cf0..c458d7eed78 100644 --- a/pkg/controller/staticpod/staticpod_controller.go +++ b/pkg/controller/staticpod/staticpod_controller.go @@ -20,7 +20,6 @@ import ( "context" "flag" "fmt" - "time" corev1 "k8s.io/api/core/v1" kerr "k8s.io/apimachinery/pkg/api/errors" @@ -75,7 +74,7 @@ const ( UpgradeWorkerPodPrefix = "yurt-static-pod-upgrade-worker-" UpgradeWorkerContainerName = "upgrade-worker" - ArgTmpl = "/usr/local/bin/yurt-static-pod-upgrade --manifest=%s --mode=%s" + ArgTmpl = "/usr/local/bin/node-servant static-pod-upgrade --name=%s --namespace=%s --manifest=%s --hash=%s --mode=%s" ) // upgradeWorker is the pod template used for static pod upgrade @@ -254,7 +253,7 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ // Fetch the StaticPod instance instance := &appsv1alpha1.StaticPod{} if err := r.Get(context.TODO(), request.NamespacedName, instance); err != nil { - klog.Errorf("Fail to get StaticPod %v, %v", request.NamespacedName.Name, err) + klog.Errorf("Fail to get StaticPod %v, %v", request.NamespacedName, err) return ctrl.Result{}, client.IgnoreNotFound(err) } @@ -265,12 +264,6 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ var ( // totalNumber represents the total number of nodes running the target static pod totalNumber int32 - - // desiredNumber represents the desired upgraded number of nodes running the target static pod - // In auto mode: it's the number of ready nodes running the target static pod - // In ota mode: it's equal to totalNumber - desiredNumber int32 - // upgradedNumber represents the number of nodes that have been upgraded upgradedNumber int32 ) @@ -279,14 +272,14 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ upgradeInfos, err := upgradeinfo.New(r.Client, instance, UpgradeWorkerPodPrefix) if err != nil { klog.Errorf(Format("Fail to get static pod and worker pod upgrade info for nodes of StaticPod %v, %v", - request.NamespacedName.Name, err)) + request.NamespacedName, err)) return ctrl.Result{}, err } totalNumber = int32(len(upgradeInfos)) // There are no nodes running target static pods in the cluster if totalNumber == 0 { - klog.Infof(Format("No static pods need to be upgraded of StaticPod %v", request.NamespacedName.Name)) - return r.updateStaticPodStatus(instance, totalNumber, totalNumber, totalNumber) + klog.Infof(Format("No static pods need to be upgraded of StaticPod %v", request.NamespacedName)) + return r.updateStaticPodStatus(instance, totalNumber, totalNumber) } // The latest hash value for static pod spec @@ -300,103 +293,100 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ // The above hash value will be added to the annotation latestManifest, err := util.GenStaticPodManifest(&instance.Spec.Template, latestHash) if err != nil { - klog.Errorf(Format("Fail to generate static pod manifest of StaticPod %v, %v", request.NamespacedName.Name, err)) + klog.Errorf(Format("Fail to generate static pod manifest of StaticPod %v, %v", request.NamespacedName, err)) return ctrl.Result{}, err } // Sync the corresponding configmap to the latest state if err := r.syncConfigMap(instance, latestHash, latestManifest); err != nil { - klog.Errorf(Format("Fail to sync the corresponding configmap of StaticPod %v, %v", request.NamespacedName.Name, err)) + klog.Errorf(Format("Fail to sync the corresponding configmap of StaticPod %v, %v", request.NamespacedName, err)) return ctrl.Result{}, err } // Complete upgrade info { // Count the number of upgraded nodes - upgradedNumber = setUpgradeNeededInfos(upgradeInfos, latestHash) + upgradedNumber = upgradeinfo.SetUpgradeNeededInfos(upgradeInfos, latestHash) // Set node ready info if err := checkReadyNodes(r.Client, upgradeInfos); err != nil { - klog.Errorf(Format("Fail to check node ready status of StaticPod %v,%v", request.NamespacedName.Name, err)) + klog.Errorf(Format("Fail to check node ready status of StaticPod %v,%v", request.NamespacedName, err)) return ctrl.Result{}, err } } // Sync worker pods - var allSucceeded bool + allSucceeded := true deletePods := make([]*corev1.Pod, 0) - succeededNodes := make([]string, 0) { - // Deal with worker pods - allSucceeded, err = r.checkWorkerPods(upgradeInfos, latestHash, &deletePods, &succeededNodes) - if err != nil { - klog.Errorf(Format("Fail to continue upgrade, cause worker pod of StaticPod %v in node %v failed", - request.NamespacedName.Name, err.Error())) - return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber) - } - - // Verify succeededNodes - if instance.Spec.UpgradeStrategy.Type == appsv1alpha1.AutoStaticPodUpgradeStrategyType { - ok, err := r.verifySucceededNodes(instance, succeededNodes, latestHash) - if util.IsFailedError(err) { - klog.Errorf(Format("Fail to verify succeededNodes of StaticPod %v, %v", request.NamespacedName.Name, err)) - return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber) + for node, info := range upgradeInfos { + if info.WorkerPod == nil { + continue } - if err != nil { - klog.Errorf(Format("Fail to verify succeededNodes of StaticPod %v, %v", request.NamespacedName.Name, err)) - return reconcile.Result{}, err + hash := info.WorkerPod.Annotations[StaticPodHashAnnotation] + // If the worker pod is not up-to-date, then it can be recreated directly + if hash != latestHash { + deletePods = append(deletePods, info.WorkerPod) + continue } - - if !ok { - return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + // If the worker pod is up-to-date, there are three possible situations + // 1. The worker pod is failed, then some irreparable failure has occurred. Just stop reconcile and update status + // 2. The worker pod is succeeded, then this node must be up-to-date. Just delete this worker pod + // 3. The worker pod is running, pending or unknown, then just wait + switch info.WorkerPod.Status.Phase { + case corev1.PodFailed: + klog.Errorf("Fail to continue upgrade, cause worker pod %s of StaticPod %v in node %s failed", + info.WorkerPod.Name, request.NamespacedName, node) + return reconcile.Result{}, + fmt.Errorf("fail to continue upgrade, cause worker pod %s of StaticPod %v in node %s failed", + info.WorkerPod.Name, request.NamespacedName, node) + case corev1.PodSucceeded: + deletePods = append(deletePods, info.WorkerPod) + default: + // In this node, the latest worker pod is still running, and we don't need to create new worker for it. + info.WorkerPodRunning = true + allSucceeded = false } } } // Clean up unused pods if err := r.removeUnusedPods(deletePods); err != nil { - klog.Errorf(Format("Fail to remove unused pods of StaticPod %v, %v", request.NamespacedName.Name, err)) + klog.Errorf(Format("Fail to remove unused pods of StaticPod %v, %v", request.NamespacedName, err)) return reconcile.Result{}, err } // If all nodes have been upgraded, just return // Put this here because we need to clean up the worker pods first if totalNumber == upgradedNumber { - klog.Infof(Format("All static pods have been upgraded of StaticPod %v", request.NamespacedName.Name)) - return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber) + klog.Infof(Format("All static pods have been upgraded of StaticPod %v", request.NamespacedName)) + return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber) } switch instance.Spec.UpgradeStrategy.Type { // Auto Upgrade is to automate the upgrade process for the target static pods on ready nodes // It supports rolling update and the max-unavailable number can be specified by users case appsv1alpha1.AutoStaticPodUpgradeStrategyType: - // In auto upgrade mode, desiredNumber is the number of ready nodes - desiredNumber = int32(len(upgradeinfo.ReadyNodes(upgradeInfos))) - // This means that all the desired nodes are upgraded. It's considered successful. - if desiredNumber == upgradedNumber { - return r.updateStaticPodStatus(instance, totalNumber, desiredNumber, upgradedNumber) - } - if !allSucceeded { - klog.V(5).Infof(Format("Wait last round auto upgrade to finish of StaticPod %v", request.NamespacedName.Name)) - return r.updateStaticPodStatus(instance, totalNumber, desiredNumber, upgradedNumber) + klog.V(5).Infof(Format("Wait last round auto upgrade to finish of StaticPod %v", request.NamespacedName)) + return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber) } if err := r.autoUpgrade(instance, upgradeInfos, latestHash); err != nil { - klog.Errorf(Format("Fail to auto upgrade of StaticPod %v, %v", request.NamespacedName.Name, err)) + klog.Errorf(Format("Fail to auto upgrade of StaticPod %v, %v", request.NamespacedName, err)) return ctrl.Result{}, err } - return r.updateStaticPodStatus(instance, totalNumber, desiredNumber, upgradedNumber) + return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber) // OTA Upgrade can help users control the timing of static pods upgrade // It will set PodNeedUpgrade condition and work with YurtHub component case appsv1alpha1.OTAStaticPodUpgradeStrategyType: if err := r.otaUpgrade(instance, upgradeInfos, latestHash); err != nil { - klog.Errorf(Format("Fail to ota upgrade of StaticPod %v, %v", request.NamespacedName.Name, err)) + klog.Errorf(Format("Fail to ota upgrade of StaticPod %v, %v", request.NamespacedName, err)) return ctrl.Result{}, err } - return r.updateStaticPodStatus(instance, totalNumber, totalNumber, upgradedNumber) + return r.updateStaticPodStatus(instance, totalNumber, upgradedNumber) } return ctrl.Result{}, nil @@ -443,62 +433,6 @@ func (r *ReconcileStaticPod) syncConfigMap(instance *appsv1alpha1.StaticPod, has return nil } -// syncWorkerPod synchronizes the worker pods for each node which has -func (r *ReconcileStaticPod) checkWorkerPods(infos map[string]*upgradeinfo.UpgradeInfo, latestHash string, - deletePods *[]*corev1.Pod, succeededNodes *[]string) (bool, error) { - allSucceeded := true - - for node, info := range infos { - if info.WorkerPod == nil { - continue - } - - hash := info.WorkerPod.Annotations[StaticPodHashAnnotation] - // If the worker pod is not up-to-date, then it can be recreated directly - if hash != latestHash { - *deletePods = append(*deletePods, info.WorkerPod) - continue - } - // If the worker pod is up-to-date, there are three possible situations - // 1. The worker pod is failed, then some irreparable failure has occurred. Just stop reconcile and update status - // 2. The worker pod is succeeded, then this node must be up-to-date. Just delete this worker pod - // 3. The worker pod is running, pending or unknown, then just wait - switch info.WorkerPod.Status.Phase { - case corev1.PodFailed: - return false, util.NewFailedError(node) - case corev1.PodSucceeded: - *succeededNodes = append(*succeededNodes, node) - *deletePods = append(*deletePods, info.WorkerPod) - default: - // In this node, the latest worker pod is still running, and we don't need to create new worker for it. - info.WorkerPodRunning = true - allSucceeded = false - } - } - - return allSucceeded, nil -} - -// verifyUpgrade verify that whether the new static pods on the given nodes are ready -func (r *ReconcileStaticPod) verifySucceededNodes(instance *appsv1alpha1.StaticPod, nodes []string, hash string) (bool, error) { - pod := &corev1.Pod{} - for _, node := range nodes { - if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: util.Hyphen(instance.Name, node)}, pod); err != nil { - return false, err - } - - if pod.Status.Phase == corev1.PodFailed { - return false, util.NewFailedError(node) - } - - if pod.Status.Phase != corev1.PodRunning || pod.Annotations[StaticPodHashAnnotation] != hash { - klog.V(5).Infof("Fail to verify new static pod on %v", node) - return false, nil - } - } - return true, nil -} - // autoUpgrade automatically rolling upgrade the target static pods in cluster func (r *ReconcileStaticPod) autoUpgrade(instance *appsv1alpha1.StaticPod, infos map[string]*upgradeinfo.UpgradeInfo, hash string) error { // readyUpgradeWaitingNodes represents nodes that need to create worker pods @@ -520,7 +454,8 @@ func (r *ReconcileStaticPod) autoUpgrade(instance *appsv1alpha1.StaticPod, infos } readyUpgradeWaitingNodes = readyUpgradeWaitingNodes[:max] - if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash, string(appsv1alpha1.AutoStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil { + if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash, + string(appsv1alpha1.AutoStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil { return err } return nil @@ -547,7 +482,8 @@ func (r *ReconcileStaticPod) otaUpgrade(instance *appsv1alpha1.StaticPod, infos // Create worker pod to issue the latest manifest to ready node readyUpgradeWaitingNodes := upgradeinfo.OTAReadyUpgradeWaitingNodes(infos, hash) - if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash, string(appsv1alpha1.OTAStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil { + if err := createUpgradeWorker(r.Client, instance, readyUpgradeWaitingNodes, hash, + string(appsv1alpha1.OTAStaticPodUpgradeStrategyType), r.Configration.UpgradeWorkerImage); err != nil { return err } @@ -560,7 +496,7 @@ func (r *ReconcileStaticPod) otaUpgrade(instance *appsv1alpha1.StaticPod, infos // setLatestManifestHash set the latest manifest hash value to target static pod annotation // TODO: In ota mode, it's hard for controller to check whether the latest manifest file has been issued to nodes -// TODO: Use annotation `openyurt.io/ota-manifest-version` to indicate the version of manifest issued to nodes +// TODO: Use annotation `openyurt.io/ota-latest-version` to indicate the version of manifest issued to nodes func (r *ReconcileStaticPod) setLatestManifestHash(instance *appsv1alpha1.StaticPod, nodes []string, hash string) error { pod := &corev1.Pod{} for _, node := range nodes { @@ -606,7 +542,8 @@ func createUpgradeWorker(c client.Client, instance *appsv1alpha1.StaticPod, node }, }, }) - pod.Spec.Containers[0].Args = []string{fmt.Sprintf(ArgTmpl, instance.Spec.StaticPodManifest, mode)} + pod.Spec.Containers[0].Args = []string{fmt.Sprintf(ArgTmpl, util.Hyphen(instance.Name, node), instance.Namespace, + instance.Spec.StaticPodManifest, hash, mode)} pod.Spec.Containers[0].Image = img if err := controllerutil.SetControllerReference(instance, pod, c.Scheme()); err != nil { return err @@ -621,24 +558,6 @@ func createUpgradeWorker(c client.Client, instance *appsv1alpha1.StaticPod, node return nil } -// setUpgradeNeededInfo sets `UpgradeNeeded` flag and counts the number of upgraded nodes -func setUpgradeNeededInfos(infos map[string]*upgradeinfo.UpgradeInfo, latestHash string) int32 { - var upgradedNumber int32 - - for _, info := range infos { - if info.StaticPod != nil { - if info.StaticPod.Annotations[StaticPodHashAnnotation] != latestHash { - // Indicate the static pod in this node needs to be upgraded - info.UpgradeNeeded = true - continue - } - upgradedNumber++ - } - } - - return upgradedNumber -} - // checkReadyNodes checks and sets the ready status for every node which has the target static pod func checkReadyNodes(client client.Client, infos map[string]*upgradeinfo.UpgradeInfo) error { for node, info := range infos { @@ -652,9 +571,9 @@ func checkReadyNodes(client client.Client, infos map[string]*upgradeinfo.Upgrade } // updateStatus set the status of instance to the given values -func (r *ReconcileStaticPod) updateStaticPodStatus(instance *appsv1alpha1.StaticPod, totalNum, desiredNum, upgradedNum int32) (reconcile.Result, error) { +func (r *ReconcileStaticPod) updateStaticPodStatus(instance *appsv1alpha1.StaticPod, totalNum, upgradedNum int32) (reconcile.Result, error) { instance.Status.TotalNumber = totalNum - instance.Status.DesiredNumber = desiredNum + instance.Status.ReadyNumber = upgradedNum instance.Status.UpgradedNumber = upgradedNum if err := r.Client.Status().Update(context.TODO(), instance); err != nil { diff --git a/pkg/controller/staticpod/upgradeinfo/upgrade_info.go b/pkg/controller/staticpod/upgradeinfo/upgrade_info.go index 0dc8a275425..618e702ce24 100644 --- a/pkg/controller/staticpod/upgradeinfo/upgrade_info.go +++ b/pkg/controller/staticpod/upgradeinfo/upgrade_info.go @@ -28,6 +28,7 @@ import ( ) const ( + StaticPodHashAnnotation = "openyurt.io/static-pod-hash" OTALatestManifestAnnotation = "openyurt.io/ota-latest-version" ) @@ -171,3 +172,21 @@ func UpgradedNodes(infos map[string]*UpgradeInfo) []string { } return nodes } + +// SetUpgradeNeededInfo sets `UpgradeNeeded` flag and counts the number of upgraded nodes +func SetUpgradeNeededInfos(infos map[string]*UpgradeInfo, latestHash string) int32 { + var upgradedNumber int32 + + for _, info := range infos { + if info.StaticPod != nil { + if info.StaticPod.Annotations[StaticPodHashAnnotation] != latestHash { + // Indicate the static pod in this node needs to be upgraded + info.UpgradeNeeded = true + continue + } + upgradedNumber++ + } + } + + return upgradedNumber +} diff --git a/pkg/controller/staticpod/util/util.go b/pkg/controller/staticpod/util/util.go index 3d7b28a2ebb..4a31d0d3229 100644 --- a/pkg/controller/staticpod/util/util.go +++ b/pkg/controller/staticpod/util/util.go @@ -198,23 +198,3 @@ func GetPodConditionFromList(conditions []corev1.PodCondition, conditionType cor } return -1, nil } - -// FailedError represents irreparable failure -// 1. Worker pod failed -// 2. New Static Pod failed -type FailedError struct { - s string -} - -// NewFailedError create a FailedError with node that failed to upgrade -func NewFailedError(str string) error { - return &FailedError{str} -} - -func (e *FailedError) Error() string { return e.s } - -// IsFailedError returns a boolean indicating whether the error is `FailedError` -func IsFailedError(err error) bool { - _, ok := err.(*FailedError) - return ok -}