From 8bf25d0a17656fedc31c46e9f33afe0129a6643b Mon Sep 17 00:00:00 2001 From: hxcGit Date: Fri, 24 Mar 2023 18:12:29 +0300 Subject: [PATCH] adjust static pod CRD spec Signed-off-by: hxcGit --- .../yurt-manager-auto-generated.yaml | 235 ++++++++++++++++++ pkg/apis/apps/v1alpha1/staticpod_types.go | 25 +- .../apps/v1alpha1/zz_generated.deepcopy.go | 142 ++++++++++- .../staticpod/staticpod_controller.go | 207 ++++++++++----- .../staticpod/upgradeinfo/upgrade_info.go | 7 +- pkg/controller/staticpod/util/util.go | 22 +- 6 files changed, 553 insertions(+), 85 deletions(-) diff --git a/charts/openyurt/templates/yurt-manager-auto-generated.yaml b/charts/openyurt/templates/yurt-manager-auto-generated.yaml index a122f941a7e..bc3fcd3c644 100644 --- a/charts/openyurt/templates/yurt-manager-auto-generated.yaml +++ b/charts/openyurt/templates/yurt-manager-auto-generated.yaml @@ -497,6 +497,161 @@ status: conditions: [] storedVersions: [] --- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.7.0 + creationTimestamp: null + name: staticpods.apps.openyurt.io +spec: + group: apps.openyurt.io + names: + kind: StaticPod + listKind: StaticPodList + plural: staticpods + shortNames: + - sp + singular: staticpod + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: CreationTimestamp is a timestamp representing the server time when + this object was created. It is not guaranteed to be set in happens-before + order across separate operations. Clients may not set this value. It is represented + in RFC3339 form and is in UTC. + jsonPath: .metadata.creationTimestamp + name: AGE + type: date + - description: The total number of static pods + jsonPath: .status.totalNumber + name: TotalNumber + type: integer + - description: The number of static pods that desired to be upgraded + jsonPath: .status.desiredNumber + name: DesiredNumber + type: integer + - description: The number of static pods that have been upgraded + jsonPath: .status.upgradedNumber + name: UpgradedNumber + type: integer + - jsonPath: .status.conditions[0].type + name: Status + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: StaticPod is the Schema for the staticpods API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: StaticPodSpec defines the desired state of StaticPod + properties: + revisionHistoryLimit: + description: The number of old history to retain to allow rollback. + Defaults to 10. + format: int32 + type: integer + staticPodManifest: + description: StaticPodManifest indicates the Static Pod desired to + be upgraded. The corresponding manifest file name is `StaticPodManifest.yaml`. + type: string + template: + description: An object that describes the desired upgrade static pod. + x-kubernetes-preserve-unknown-fields: true + upgradeStrategy: + description: An upgrade strategy to replace existing static pods with + new ones. + properties: + maxUnavailable: + anyOf: + - type: integer + - type: string + description: Auto upgrade config params. Present only if type + = "auto". + x-kubernetes-int-or-string: true + type: + description: Type of Static Pod upgrade. Can be "auto" or "ota". + type: string + type: object + type: object + status: + description: StaticPodStatus defines the observed state of StaticPod + properties: + conditions: + description: Represents the latest available observations of StaticPod's + current state. + items: + description: StaticPodCondition describes the state of a StaticPodCondition + at a certain point. + properties: + lastTransitionTime: + description: Last time the condition transitioned from one status + to another. + format: date-time + type: string + message: + description: A human-readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of StaticPod condition. + type: string + type: object + type: array + desiredNumber: + description: The number of nodes that are running static pods which + need to be upgraded. + format: int32 + type: integer + observedGeneration: + description: The most recent generation observed by the static pod + controller. + format: int64 + type: integer + totalNumber: + description: The total number of nodes that are running the static + pod. + format: int32 + type: integer + upgradedNumber: + description: The number of nodes that are running updated static pod. + format: int32 + type: integer + required: + - desiredNumber + - totalNumber + - upgradedNumber + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] +--- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: @@ -553,6 +708,32 @@ rules: - get - patch - update +- apiGroups: + - apps.openyurt.io + resources: + - staticpods + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - apps.openyurt.io + resources: + - staticpods/finalizers + verbs: + - update +- apiGroups: + - apps.openyurt.io + resources: + - staticpods/status + verbs: + - get + - patch + - update - apiGroups: - certificates.k8s.io resources: @@ -588,6 +769,18 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - configmaps + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - "" resources: @@ -736,6 +929,27 @@ webhooks: resources: - nodepools sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: webhook-service + namespace: kube-system + path: /mutate-apps-openyurt-io-v1alpha1-staticpod + failurePolicy: Fail + name: mutate.apps.v1alpha1.staticpod.openyurt.io + rules: + - apiGroups: + - apps.openyurt.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - staticpods + sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration @@ -785,3 +999,24 @@ webhooks: resources: - nodepools sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: webhook-service + namespace: kube-system + path: /validate-apps-openyurt-io-v1alpha1-staticpod + failurePolicy: Fail + name: validate.apps.v1alpha1.staticpod.openyurt.io + rules: + - apiGroups: + - apps.openyurt.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - staticpods + sideEffects: None diff --git a/pkg/apis/apps/v1alpha1/staticpod_types.go b/pkg/apis/apps/v1alpha1/staticpod_types.go index 080b27b5c3b..2b42b8f6085 100644 --- a/pkg/apis/apps/v1alpha1/staticpod_types.go +++ b/pkg/apis/apps/v1alpha1/staticpod_types.go @@ -42,21 +42,18 @@ const ( // StaticPodSpec defines the desired state of StaticPod type StaticPodSpec struct { - // StaticPodName indicates the static pod desired to be upgraded. - StaticPodName string `json:"staticPodName"` - // StaticPodManifest indicates the Static Pod desired to be upgraded. The corresponding // manifest file name is `StaticPodManifest.yaml`. - // +optional StaticPodManifest string `json:"staticPodManifest,omitempty"` - // Namespace indicates the namespace of target static pod - // +optional - StaticPodNamespace string `json:"staticPodNamespace,omitempty"` - // An upgrade strategy to replace existing static pods with new ones. UpgradeStrategy StaticPodUpgradeStrategy `json:"upgradeStrategy,omitempty"` + // The number of old history to retain to allow rollback. + // Defaults to 10. + // +optional + RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"` + // An object that describes the desired upgrade static pod. // +optional // +kubebuilder:pruning:PreserveUnknownFields @@ -97,15 +94,19 @@ type StaticPodCondition struct { // StaticPodStatus defines the observed state of StaticPod type StaticPodStatus struct { - // The total number of static pods + // The total number of nodes that are running the static pod. TotalNumber int32 `json:"totalNumber"` - // The number of static pods that should be upgraded. + // The number of nodes that are running static pods which need to be upgraded. DesiredNumber int32 `json:"desiredNumber"` - // The number of static pods that have been upgraded. + // The number of nodes that are running updated static pod. UpgradedNumber int32 `json:"upgradedNumber"` + // The most recent generation observed by the static pod controller. + // +optional + ObservedGeneration int64 `json:"observedGeneration"` + // Represents the latest available observations of StaticPod's current state. // +optional Conditions []StaticPodCondition `json:"conditions"` @@ -115,7 +116,7 @@ type StaticPodStatus struct { // +k8s:openapi-gen=true // +kubebuilder:object:root=true // +kubebuilder:subresource:status -// +kubebuilder:resource:scope=Cluster,path=staticpods,shortName=sp,categories=all +// +kubebuilder:resource:shortName=sp // +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC." //+kubebuilder:printcolumn:name="TotalNumber",type="integer",JSONPath=".status.totalNumber",description="The total number of static pods" //+kubebuilder:printcolumn:name="DesiredNumber",type="integer",JSONPath=".status.desiredNumber",description="The number of static pods that desired to be upgraded" diff --git a/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go index 69e3dc08139..09c4eaf89af 100644 --- a/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -23,8 +23,9 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -146,3 +147,142 @@ func (in *NodePoolStatus) DeepCopy() *NodePoolStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StaticPod) DeepCopyInto(out *StaticPod) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StaticPod. +func (in *StaticPod) DeepCopy() *StaticPod { + if in == nil { + return nil + } + out := new(StaticPod) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *StaticPod) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StaticPodCondition) DeepCopyInto(out *StaticPodCondition) { + *out = *in + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StaticPodCondition. +func (in *StaticPodCondition) DeepCopy() *StaticPodCondition { + if in == nil { + return nil + } + out := new(StaticPodCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StaticPodList) DeepCopyInto(out *StaticPodList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]StaticPod, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StaticPodList. +func (in *StaticPodList) DeepCopy() *StaticPodList { + if in == nil { + return nil + } + out := new(StaticPodList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *StaticPodList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StaticPodSpec) DeepCopyInto(out *StaticPodSpec) { + *out = *in + in.UpgradeStrategy.DeepCopyInto(&out.UpgradeStrategy) + if in.RevisionHistoryLimit != nil { + in, out := &in.RevisionHistoryLimit, &out.RevisionHistoryLimit + *out = new(int32) + **out = **in + } + in.Template.DeepCopyInto(&out.Template) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StaticPodSpec. +func (in *StaticPodSpec) DeepCopy() *StaticPodSpec { + if in == nil { + return nil + } + out := new(StaticPodSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StaticPodStatus) DeepCopyInto(out *StaticPodStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]StaticPodCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StaticPodStatus. +func (in *StaticPodStatus) DeepCopy() *StaticPodStatus { + if in == nil { + return nil + } + out := new(StaticPodStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StaticPodUpgradeStrategy) DeepCopyInto(out *StaticPodUpgradeStrategy) { + *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StaticPodUpgradeStrategy. +func (in *StaticPodUpgradeStrategy) DeepCopy() *StaticPodUpgradeStrategy { + if in == nil { + return nil + } + out := new(StaticPodUpgradeStrategy) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/controller/staticpod/staticpod_controller.go b/pkg/controller/staticpod/staticpod_controller.go index e0bd900fe3f..a15379a4b55 100644 --- a/pkg/controller/staticpod/staticpod_controller.go +++ b/pkg/controller/staticpod/staticpod_controller.go @@ -20,6 +20,7 @@ import ( "context" "flag" "fmt" + "time" corev1 "k8s.io/api/core/v1" kerr "k8s.io/apimachinery/pkg/api/errors" @@ -61,7 +62,8 @@ var ( const ( controllerName = "StaticPod-controller" - StaticPodHashAnnotation = "openyurt.io/static-pod-hash" + StaticPodHashAnnotation = "openyurt.io/static-pod-hash" + OTALatestManifestAnnotation = "openyurt.io/ota-latest-version" hostPathVolumeName = "hostpath" hostPathVolumeMountPath = "/etc/kubernetes/manifests/" @@ -73,9 +75,8 @@ const ( UpgradeWorkerPodPrefix = "yurt-static-pod-upgrade-worker-" UpgradeWorkerContainerName = "upgrade-worker" UpgradeWorkerImage = "openyurt/yurt-static-pod-upgrade:latest" - UpgradeServiceAccount = "yurt-manager" - ArgTmpl = "/usr/local/bin/yurt-static-pod-upgrade --name=%s --manifest=%s --hash=%s --namespace=%s --mode=%s" + ArgTmpl = "/usr/local/bin/yurt-static-pod-upgrade --manifest=%s --mode=%s" ) // upgradeWorker is the pod template used for static pod upgrade @@ -86,12 +87,10 @@ const ( // 4. the corresponding configmap var ( upgradeWorker = &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem}, Spec: corev1.PodSpec{ - HostPID: true, - HostNetwork: true, - RestartPolicy: corev1.RestartPolicyNever, - ServiceAccountName: UpgradeServiceAccount, + HostPID: true, + HostNetwork: true, + RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{{ Name: UpgradeWorkerContainerName, Command: []string{"/bin/sh", "-c"}, @@ -185,7 +184,8 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { var requests []reconcile.Request for _, staticPod := range staticPodList.Items { requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{ - Name: staticPod.Name, + Namespace: staticPod.Namespace, + Name: staticPod.Name, }}) } return requests @@ -300,31 +300,60 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ return ctrl.Result{}, err } - // Count the number of upgraded nodes - upgradedNumber = setUpgradeNeededInfos(upgradeInfos, latestHash) + // Complete upgrade info + { + // Count the number of upgraded nodes + upgradedNumber = setUpgradeNeededInfos(upgradeInfos, latestHash) - // Check node ready info - if err := checkReadyNodes(r.Client, upgradeInfos); err != nil { - klog.Errorf(Format("Fail to check node ready status of StaticPod %v,%v", request.NamespacedName.Name, err)) - return ctrl.Result{}, err + // Set node ready info + if err := checkReadyNodes(r.Client, upgradeInfos); err != nil { + klog.Errorf(Format("Fail to check node ready status of StaticPod %v,%v", request.NamespacedName.Name, err)) + return ctrl.Result{}, err + } } - // allSucceeded flag is used to indicate whether the worker pods in last round have been all succeeded. - // In auto upgrade mode, if the value is false, it will wait util all the worker pods succeed. - failedNode, allSucceeded, deletePods := checkWorkerPodInfos(upgradeInfos, latestHash) - // If node is not empty, it means the worker pod failed in this node. - if failedNode != "" { - klog.Errorf(Format("Fail to continue upgrade, cause worker pod of StaticPod %v in node %s failed", request.NamespacedName.Name, failedNode)) - return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber, util.UpgradeFailedConditionWithNode(failedNode)) + // Sync worker pods + var allSucceeded bool + deletePods := make([]*corev1.Pod, 0) + succeededNodes := make([]string, 0) + { + // Deal with worker pods + allSucceeded, err = r.checkWorkerPods(upgradeInfos, latestHash, &deletePods, &succeededNodes) + if err != nil { + klog.Errorf(Format("Fail to continue upgrade, cause worker pod of StaticPod %v in node %v failed", + request.NamespacedName.Name, err.Error())) + return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber, + util.UpgradeFailedConditionWithNode(err.Error())) + } + + // Verify succeededNodes + if instance.Spec.UpgradeStrategy.Type == appsv1alpha1.AutoStaticPodUpgradeStrategyType { + ok, err := r.verifySucceededNodes(instance, succeededNodes, latestHash) + if util.IsFailedError(err) { + klog.Errorf(Format("Fail to verify succeededNodes of StaticPod %v, %v", request.NamespacedName.Name, err)) + return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber, + util.UpgradeFailedConditionWithNode(err.Error())) + } + + if err != nil { + klog.Errorf(Format("Fail to verify succeededNodes of StaticPod %v, %v", request.NamespacedName.Name, err)) + return reconcile.Result{}, err + } + + if !ok { + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + } } - // Clean up the unused worker pods + // Clean up unused pods if err := r.removeUnusedPods(deletePods); err != nil { - klog.Errorf("Fail to delete out-of-date worker pods of StaticPod %v, %v", request.NamespacedName.Name, err) - return ctrl.Result{}, nil + klog.Errorf(Format("Fail to remove unused pods of StaticPod %v, %v", request.NamespacedName.Name, err)) + return reconcile.Result{}, err } // If all nodes have been upgraded, just return + // Put this here because we need to clean up the worker pods first if totalNumber == upgradedNumber { klog.Infof(Format("All static pods have been upgraded of StaticPod %v", request.NamespacedName.Name)) return r.updateStaticPodStatus(instance, totalNumber, instance.Status.DesiredNumber, upgradedNumber, upgradeSuccessCondition) @@ -367,15 +396,15 @@ func (r *ReconcileStaticPod) Reconcile(_ context.Context, request reconcile.Requ // syncConfigMap moves the target static pod's corresponding configmap to the latest state func (r *ReconcileStaticPod) syncConfigMap(instance *appsv1alpha1.StaticPod, hash, data string) error { - cmName := util.WithConfigMapPrefix(util.Hyphen(instance.Spec.StaticPodNamespace, instance.Spec.StaticPodName)) + cmName := util.WithConfigMapPrefix(util.Hyphen(instance.Namespace, instance.Name)) cm := &corev1.ConfigMap{} - if err := r.Get(context.TODO(), types.NamespacedName{Name: cmName, Namespace: metav1.NamespaceSystem}, cm); err != nil { + if err := r.Get(context.TODO(), types.NamespacedName{Name: cmName, Namespace: instance.Namespace}, cm); err != nil { // if the configmap does not exist, then create a new one if kerr.IsNotFound(err) { cm = &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: cmName, - Namespace: metav1.NamespaceSystem, + Namespace: instance.Namespace, Annotations: map[string]string{ StaticPodHashAnnotation: hash, }, @@ -406,6 +435,62 @@ func (r *ReconcileStaticPod) syncConfigMap(instance *appsv1alpha1.StaticPod, has return nil } +// syncWorkerPod synchronizes the worker pods for each node which has +func (r *ReconcileStaticPod) checkWorkerPods(infos map[string]*upgradeinfo.UpgradeInfo, latestHash string, + deletePods *[]*corev1.Pod, succeededNodes *[]string) (bool, error) { + allSucceeded := true + + for node, info := range infos { + if info.WorkerPod == nil { + continue + } + + hash := info.WorkerPod.Annotations[StaticPodHashAnnotation] + // If the worker pod is not up-to-date, then it can be recreated directly + if hash != latestHash { + *deletePods = append(*deletePods, info.WorkerPod) + continue + } + // If the worker pod is up-to-date, there are three possible situations + // 1. The worker pod is failed, then some irreparable failure has occurred. Just stop reconcile and update status + // 2. The worker pod is succeeded, then this node must be up-to-date. Just delete this worker pod + // 3. The worker pod is running, pending or unknown, then just wait + switch info.WorkerPod.Status.Phase { + case corev1.PodFailed: + return false, util.NewFailedError(node) + case corev1.PodSucceeded: + *succeededNodes = append(*succeededNodes, node) + *deletePods = append(*deletePods, info.WorkerPod) + default: + // In this node, the latest worker pod is still running, and we don't need to create new worker for it. + info.WorkerPodRunning = true + allSucceeded = false + } + } + + return allSucceeded, nil +} + +// verifyUpgrade verify that whether the new static pods on the given nodes are ready +func (r *ReconcileStaticPod) verifySucceededNodes(instance *appsv1alpha1.StaticPod, nodes []string, hash string) (bool, error) { + pod := &corev1.Pod{} + for _, node := range nodes { + if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: util.Hyphen(instance.Name, node)}, pod); err != nil { + return false, err + } + + if pod.Status.Phase == corev1.PodFailed { + return false, util.NewFailedError(node) + } + + if pod.Status.Phase != corev1.PodRunning || pod.Annotations[StaticPodHashAnnotation] != hash { + klog.V(5).Infof("Fail to verify new static pod on %v", node) + return false, nil + } + } + return true, nil +} + // autoUpgrade automatically rolling upgrade the target static pods in cluster func (r *ReconcileStaticPod) autoUpgrade(instance *appsv1alpha1.StaticPod, infos map[string]*upgradeinfo.UpgradeInfo, hash string) error { // readyUpgradeWaitingNodes represents nodes that need to create worker pods @@ -458,6 +543,29 @@ func (r *ReconcileStaticPod) otaUpgrade(instance *appsv1alpha1.StaticPod, infos return err } + if err := r.setLatestManifestHash(instance, readyUpgradeWaitingNodes, hash); err != nil { + return err + } + + return nil +} + +// setLatestManifestHash set the latest manifest hash value to target static pod annotation +// TODO: In ota mode, it's hard for controller to check whether the latest manifest file has been issued to nodes +// TODO: Use annotation `openyurt.io/ota-manifest-version` to indicate the version of manifest issued to nodes +func (r *ReconcileStaticPod) setLatestManifestHash(instance *appsv1alpha1.StaticPod, nodes []string, hash string) error { + pod := &corev1.Pod{} + for _, node := range nodes { + if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, + Name: util.Hyphen(instance.Name, node)}, pod); err != nil { + return err + } + + metav1.SetMetaDataAnnotation(&pod.ObjectMeta, OTALatestManifestAnnotation, hash) + if err := r.Client.Update(context.TODO(), pod, &client.UpdateOptions{}); err != nil { + return err + } + } return nil } @@ -476,7 +584,8 @@ func (r *ReconcileStaticPod) removeUnusedPods(pods []*corev1.Pod) error { func createUpgradeWorker(c client.Client, instance *appsv1alpha1.StaticPod, nodes []string, hash, mode string) error { for _, node := range nodes { pod := upgradeWorker.DeepCopy() - pod.Name = UpgradeWorkerPodPrefix + node + "-" + hash + pod.Name = UpgradeWorkerPodPrefix + util.Hyphen(node, hash) + pod.Namespace = instance.Namespace pod.Spec.NodeName = node metav1.SetMetaDataAnnotation(&pod.ObjectMeta, StaticPodHashAnnotation, hash) pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ @@ -484,12 +593,12 @@ func createUpgradeWorker(c client.Client, instance *appsv1alpha1.StaticPod, node VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: util.WithConfigMapPrefix(util.Hyphen(instance.Spec.StaticPodNamespace, instance.Spec.StaticPodName)), + Name: util.WithConfigMapPrefix(util.Hyphen(instance.Namespace, instance.Name)), }, }, }, }) - pod.Spec.Containers[0].Args = []string{fmt.Sprintf(ArgTmpl, util.Hyphen(instance.Spec.StaticPodName, node), instance.Spec.StaticPodManifest, hash, instance.Spec.StaticPodNamespace, mode)} + pod.Spec.Containers[0].Args = []string{fmt.Sprintf(ArgTmpl, instance.Spec.StaticPodManifest, mode)} if err := controllerutil.SetControllerReference(instance, pod, c.Scheme()); err != nil { return err @@ -534,42 +643,6 @@ func checkReadyNodes(client client.Client, infos map[string]*upgradeinfo.Upgrade return nil } -// checkWorkerPodInfos removes worker pods which are not in use and checks whether the last round is complete -func checkWorkerPodInfos(infos map[string]*upgradeinfo.UpgradeInfo, latestHash string) (string, bool, []*corev1.Pod) { - allSucceeded := true - deletePods := make([]*corev1.Pod, 0) - - for node, info := range infos { - if info.WorkerPod != nil { - hash := info.WorkerPod.Annotations[StaticPodHashAnnotation] - // If the worker pod is not up-to-date, then it can be recreated directly - if hash != latestHash { - deletePods = append(deletePods, info.WorkerPod) - continue - } - - // If the worker pod is up-to-date, there are three possible situations - // 1. The worker pod is failed, then some irreparable failure has occurred. Just stop reconcile and update status - // 2. The worker pod is succeeded, then this node must be up-to-date. Just delete this worker pod - // 3. The worker pod is running, pending or unknown, then just wait - switch info.WorkerPod.Status.Phase { - case corev1.PodFailed: - return node, false, deletePods - - case corev1.PodSucceeded: - deletePods = append(deletePods, info.WorkerPod) - - default: - // In this node, the latest worker pod is still running, and we don't need to create new worker for it. - info.WorkerPodRunning = true - allSucceeded = false - } - } - } - - return "", allSucceeded, deletePods -} - // updateStatus set the status of instance to the given values func (r *ReconcileStaticPod) updateStaticPodStatus(instance *appsv1alpha1.StaticPod, totalNum, desiredNum, upgradedNum int32, cond *appsv1alpha1.StaticPodCondition) (reconcile.Result, error) { instance.Status.Conditions = []appsv1alpha1.StaticPodCondition{*cond} diff --git a/pkg/controller/staticpod/upgradeinfo/upgrade_info.go b/pkg/controller/staticpod/upgradeinfo/upgrade_info.go index cab4d983030..0dc8a275425 100644 --- a/pkg/controller/staticpod/upgradeinfo/upgrade_info.go +++ b/pkg/controller/staticpod/upgradeinfo/upgrade_info.go @@ -21,7 +21,6 @@ import ( "strings" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" appsv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1" @@ -59,11 +58,11 @@ func New(c client.Client, instance *appsv1alpha1.StaticPod, workerPodName string // upgrade worker pod is default in "kube-system" namespace which may be different with target static pod's namespace var podList, systemPodList corev1.PodList - if err := c.List(context.TODO(), &podList, &client.ListOptions{Namespace: instance.Spec.StaticPodNamespace}); err != nil { + if err := c.List(context.TODO(), &podList, &client.ListOptions{Namespace: instance.Namespace}); err != nil { return nil, err } - if err := c.List(context.TODO(), &systemPodList, &client.ListOptions{Namespace: metav1.NamespaceSystem}); err != nil { + if err := c.List(context.TODO(), &systemPodList, &client.ListOptions{Namespace: instance.Namespace}); err != nil { return nil, err } @@ -74,7 +73,7 @@ func New(c client.Client, instance *appsv1alpha1.StaticPod, workerPodName string } // The name format of mirror static pod is `StaticPodName-NodeName` - if util.Hyphen(instance.Spec.StaticPodName, nodeName) == pod.Name && isStaticPod(&pod) { + if util.Hyphen(instance.Name, nodeName) == pod.Name && isStaticPod(&pod) { if info := infos[nodeName]; info == nil { infos[nodeName] = &UpgradeInfo{} } diff --git a/pkg/controller/staticpod/util/util.go b/pkg/controller/staticpod/util/util.go index 35b62aef099..c2de49e4aa7 100644 --- a/pkg/controller/staticpod/util/util.go +++ b/pkg/controller/staticpod/util/util.go @@ -109,7 +109,7 @@ func GenStaticPodManifest(tmplSpec *corev1.PodTemplateSpec, hash string) (string // NodeReadyByName check if the given node is ready func NodeReadyByName(c client.Client, nodeName string) (bool, error) { node := &corev1.Node{} - if err := c.Get(context.TODO(), types.NamespacedName{Name: nodeName}, node);err != nil { + if err := c.Get(context.TODO(), types.NamespacedName{Name: nodeName}, node); err != nil { return false, err } @@ -214,3 +214,23 @@ func GetPodConditionFromList(conditions []corev1.PodCondition, conditionType cor } return -1, nil } + +// FailedError represents irreparable failure +// 1. Worker pod failed +// 2. New Static Pod failed +type FailedError struct { + s string +} + +// NewFailedError create a FailedError with node that failed to upgrade +func NewFailedError(str string) error { + return &FailedError{str} +} + +func (e *FailedError) Error() string { return e.s } + +// IsFailedError returns a boolean indicating whether the error is `FailedError` +func IsFailedError(err error) bool { + _, ok := err.(*FailedError) + return ok +}