From 9e2847b9354fc81354f92df4034f088727ac26e1 Mon Sep 17 00:00:00 2001 From: Wei-Xiang Sun Date: Wed, 29 Dec 2021 11:28:33 +0800 Subject: [PATCH] store history revisions for sidecarset (#715) Signed-off-by: veophi --- apis/apps/defaults/v1alpha1.go | 13 +- apis/apps/v1alpha1/sidecarset_types.go | 12 + apis/apps/v1alpha1/zz_generated.deepcopy.go | 12 +- .../crd/bases/apps.kruise.io_sidecarsets.yaml | 16 ++ pkg/control/sidecarcontrol/history_control.go | 173 ++++++++++++++ pkg/control/sidecarcontrol/util.go | 25 +- .../sidecarset/sidecarset_controller.go | 1 + .../sidecarset/sidecarset_controller_test.go | 7 + .../sidecarset/sidecarset_hotupgrade_test.go | 4 +- .../sidecarset/sidecarset_processor.go | 169 +++++++++++++- .../sidecarset/sidecarset_processor_test.go | 217 ++++++++++++++++++ .../sidecarset/sidecarset_strategy_test.go | 2 + test/e2e/apps/sidecarset.go | 67 ++++++ test/e2e/framework/sidecarset_utils.go | 16 ++ 14 files changed, 718 insertions(+), 16 deletions(-) create mode 100644 pkg/control/sidecarcontrol/history_control.go diff --git a/apis/apps/defaults/v1alpha1.go b/apis/apps/defaults/v1alpha1.go index 0d29bff3de..50d28cc891 100644 --- a/apis/apps/defaults/v1alpha1.go +++ b/apis/apps/defaults/v1alpha1.go @@ -28,7 +28,7 @@ import ( // SetDefaults_SidecarSet set default values for SidecarSet. func SetDefaultsSidecarSet(obj *v1alpha1.SidecarSet) { - setSidecarSetUpdateStratety(&obj.Spec.UpdateStrategy) + setSidecarSetUpdateStrategy(&obj.Spec.UpdateStrategy) for i := range obj.Spec.InitContainers { setSidecarDefaultContainer(&obj.Spec.InitContainers[i]) @@ -40,6 +40,15 @@ func SetDefaultsSidecarSet(obj *v1alpha1.SidecarSet) { //default setting volumes SetDefaultPodVolumes(obj.Spec.Volumes) + + //default setting history revision limitation + SetDefaultRevisionHistoryLimit(&obj.Spec.RevisionHistoryLimit) +} + +func SetDefaultRevisionHistoryLimit(revisionHistoryLimit **int32) { + if *revisionHistoryLimit == nil { + *revisionHistoryLimit = utilpointer.Int32Ptr(10) + } } func setDefaultSidecarContainer(sidecarContainer *v1alpha1.SidecarContainer) { @@ -56,7 +65,7 @@ func setDefaultSidecarContainer(sidecarContainer *v1alpha1.SidecarContainer) { setSidecarDefaultContainer(sidecarContainer) } -func setSidecarSetUpdateStratety(strategy *v1alpha1.SidecarSetUpdateStrategy) { +func setSidecarSetUpdateStrategy(strategy *v1alpha1.SidecarSetUpdateStrategy) { if strategy.Type == "" { strategy.Type = v1alpha1.RollingUpdateSidecarSetStrategyType } diff --git a/apis/apps/v1alpha1/sidecarset_types.go b/apis/apps/v1alpha1/sidecarset_types.go index 914199a4a2..b150e6dfe4 100644 --- a/apis/apps/v1alpha1/sidecarset_types.go +++ b/apis/apps/v1alpha1/sidecarset_types.go @@ -52,6 +52,10 @@ type SidecarSetSpec struct { // List of the names of secrets required by pulling sidecar container images ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + + // RevisionHistoryLimit indicates the maximum quantity of stored revisions about the SidecarSet. + // default value is 10 + RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"` } // SidecarContainer defines the container of Sidecar @@ -202,6 +206,14 @@ type SidecarSetStatus struct { // updatedReadyPods is the number of matched pods that updated and ready UpdatedReadyPods int32 `json:"updatedReadyPods,omitempty"` + + // LatestRevision, if not empty, indicates the latest controllerRevision name of the SidecarSet. + LatestRevision string `json:"latestRevision,omitempty"` + + // CollisionCount is the count of hash collisions for the SidecarSet. The SidecarSet controller + // uses this field as a collision avoidance mechanism when it needs to create the name for the + // newest ControllerRevision. + CollisionCount *int32 `json:"collisionCount,omitempty"` } // +genclient diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 496278389a..bdf8c1ff5c 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -1976,7 +1976,7 @@ func (in *SidecarSet) DeepCopyInto(out *SidecarSet) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SidecarSet. @@ -2080,6 +2080,11 @@ func (in *SidecarSetSpec) DeepCopyInto(out *SidecarSetSpec) { *out = make([]v1.LocalObjectReference, len(*in)) copy(*out, *in) } + if in.RevisionHistoryLimit != nil { + in, out := &in.RevisionHistoryLimit, &out.RevisionHistoryLimit + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SidecarSetSpec. @@ -2095,6 +2100,11 @@ func (in *SidecarSetSpec) DeepCopy() *SidecarSetSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SidecarSetStatus) DeepCopyInto(out *SidecarSetStatus) { *out = *in + if in.CollisionCount != nil { + in, out := &in.CollisionCount, &out.CollisionCount + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SidecarSetStatus. diff --git a/config/crd/bases/apps.kruise.io_sidecarsets.yaml b/config/crd/bases/apps.kruise.io_sidecarsets.yaml index 5d140a7a87..db5bc300ed 100644 --- a/config/crd/bases/apps.kruise.io_sidecarsets.yaml +++ b/config/crd/bases/apps.kruise.io_sidecarsets.yaml @@ -234,6 +234,11 @@ spec: description: Namespace sidecarSet will only match the pods in the namespace otherwise, match pods in all namespaces(in cluster) type: string + revisionHistoryLimit: + description: RevisionHistoryLimit indicates the maximum quantity of + stored revisions about the SidecarSet. default value is 10 + format: int32 + type: integer selector: description: selector is a label query over pods that should be injected properties: @@ -387,6 +392,17 @@ spec: status: description: SidecarSetStatus defines the observed state of SidecarSet properties: + collisionCount: + description: CollisionCount is the count of hash collisions for the + SidecarSet. The SidecarSet controller uses this field as a collision + avoidance mechanism when it needs to create the name for the newest + ControllerRevision. + format: int32 + type: integer + latestRevision: + description: LatestRevision, if not empty, indicates the latest controllerRevision + name of the SidecarSet. + type: string matchedPods: description: matchedPods is the number of Pods whose labels are matched with this SidecarSet's selector and are created after sidecarset diff --git a/pkg/control/sidecarcontrol/history_control.go b/pkg/control/sidecarcontrol/history_control.go new file mode 100644 index 0000000000..2323e13756 --- /dev/null +++ b/pkg/control/sidecarcontrol/history_control.go @@ -0,0 +1,173 @@ +/* +Copyright 2021 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sidecarcontrol + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + + apps "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/kubernetes/pkg/controller/history" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + patchCodec = scheme.Codecs.LegacyCodec(appsv1alpha1.SchemeGroupVersion) +) + +type HistoryControl interface { + CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision, collisionCount *int32) (*apps.ControllerRevision, error) + NewRevision(s *appsv1alpha1.SidecarSet, namespace string, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) + NextRevision(revisions []*apps.ControllerRevision) int64 + GetRevisionLabelSelector(s *appsv1alpha1.SidecarSet) *metav1.LabelSelector +} + +type realControl struct { + Client client.Client +} + +func NewHistoryControl(client client.Client) HistoryControl { + return &realControl{ + Client: client, + } +} + +func (r *realControl) NewRevision(s *appsv1alpha1.SidecarSet, namespace string, revision int64, collisionCount *int32) ( + *apps.ControllerRevision, error, +) { + patch, err := r.getPatch(s) + if err != nil { + return nil, err + } + + cr, err := history.NewControllerRevision(s, + s.GetObjectKind().GroupVersionKind(), + s.Labels, + runtime.RawExtension{Raw: patch}, + revision, + collisionCount) + if err != nil { + return nil, err + } + + cr.SetNamespace(namespace) + if cr.Labels == nil { + cr.Labels = make(map[string]string) + } + if cr.ObjectMeta.Annotations == nil { + cr.ObjectMeta.Annotations = make(map[string]string) + } + cr.Labels[SidecarSetKindName] = s.Name + for key, value := range s.Annotations { + cr.ObjectMeta.Annotations[key] = value + } + return cr, nil +} + +// getPatch returns a strategic merge patch that can be applied to restore a SidecarSet to a +// previous version. If the returned error is nil the patch is valid. The current state that we save is just the +// PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously +// recorded patches. +func (r *realControl) getPatch(s *appsv1alpha1.SidecarSet) ([]byte, error) { + str, err := runtime.Encode(patchCodec, s) + if err != nil { + return nil, err + } + var raw map[string]interface{} + _ = json.Unmarshal(str, &raw) + + objCopy := make(map[string]interface{}) + specCopy := make(map[string]interface{}) + // only copy some specified fields of s.Spec to specCopy + spec := raw["spec"].(map[string]interface{}) + copySidecarSetSpecRevision(specCopy, spec) + + objCopy["spec"] = specCopy + return json.Marshal(objCopy) +} + +// NextRevision finds the next valid revision number based on revisions. If the length of revisions +// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method +// assumes that revisions has been sorted by Revision. +func (r *realControl) NextRevision(revisions []*apps.ControllerRevision) int64 { + count := len(revisions) + if count <= 0 { + return 1 + } + return revisions[count-1].Revision + 1 +} + +func (r *realControl) GetRevisionLabelSelector(s *appsv1alpha1.SidecarSet) *metav1.LabelSelector { + return &metav1.LabelSelector{ + MatchLabels: map[string]string{ + SidecarSetKindName: s.GetName(), + }, + } +} + +func (r *realControl) CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision, collisionCount *int32) (*apps.ControllerRevision, error) { + if collisionCount == nil { + return nil, fmt.Errorf("collisionCount should not be nil") + } + + // Clone the input + clone := revision.DeepCopy() + + // Continue to attempt to create the revision updating the name with a new hash on each iteration + for { + hash := history.HashControllerRevision(revision, collisionCount) + // Update the revisions name + clone.Name = history.ControllerRevisionName(parent.GetName(), hash) + err := r.Client.Create(context.TODO(), clone) + if errors.IsAlreadyExists(err) { + exists := &apps.ControllerRevision{} + key := types.NamespacedName{ + Namespace: clone.Namespace, + Name: clone.Name, + } + err = r.Client.Get(context.TODO(), key, exists) + if err != nil { + return nil, err + } + if bytes.Equal(exists.Data.Raw, clone.Data.Raw) { + return exists, nil + } + *collisionCount++ + continue + } + return clone, err + } +} + +func copySidecarSetSpecRevision(dst, src map[string]interface{}) { + // we will use patch instead of update operation to update pods in the future + // dst["$patch"] = "replace" + // only record these revisions + dst["volumes"] = src["volumes"] + dst["containers"] = src["containers"] + dst["initContainers"] = src["initContainers"] + dst["imagePullSecrets"] = src["imagePullSecrets"] +} diff --git a/pkg/control/sidecarcontrol/util.go b/pkg/control/sidecarcontrol/util.go index 1bbb00bcbd..0857dba7ad 100644 --- a/pkg/control/sidecarcontrol/util.go +++ b/pkg/control/sidecarcontrol/util.go @@ -38,6 +38,8 @@ import ( ) const ( + SidecarSetKindName = "kruise.io/sidecarset-name" + // SidecarSetHashAnnotation represents the key of a sidecarSet hash SidecarSetHashAnnotation = "kruise.io/sidecarset-hash" // SidecarSetHashWithoutImageAnnotation represents the key of a sidecarset hash without images of sidecar @@ -62,10 +64,11 @@ var ( ) type SidecarSetUpgradeSpec struct { - UpdateTimestamp metav1.Time `json:"updateTimestamp"` - SidecarSetHash string `json:"hash"` - SidecarSetName string `json:"sidecarSetName"` - SidecarList []string `json:"sidecarList"` + UpdateTimestamp metav1.Time `json:"updateTimestamp"` + SidecarSetHash string `json:"hash"` + SidecarSetName string `json:"sidecarSetName"` + SidecarList []string `json:"sidecarList"` // sidecarSet container list + SidecarSetControllerRevision string `json:"controllerRevision"` // sidecarSet controllerRevision name } // PodMatchSidecarSet determines if pod match Selector of sidecar. @@ -109,6 +112,11 @@ func GetPodSidecarSetRevision(sidecarSetName string, pod metav1.Object) string { return upgradeSpec.SidecarSetHash } +func GetPodSidecarSetControllerRevision(sidecarSetName string, pod metav1.Object) string { + upgradeSpec := GetPodSidecarSetUpgradeSpecInAnnotations(sidecarSetName, SidecarSetHashAnnotation, pod) + return upgradeSpec.SidecarSetControllerRevision +} + func GetPodSidecarSetUpgradeSpecInAnnotations(sidecarSetName, annotationKey string, pod metav1.Object) SidecarSetUpgradeSpec { annotations := pod.GetAnnotations() hashKey := annotationKey @@ -183,10 +191,11 @@ func updatePodSidecarSetHash(pod *corev1.Pod, sidecarSet *appsv1alpha1.SidecarSe } sidecarSetHash[sidecarSet.Name] = SidecarSetUpgradeSpec{ - UpdateTimestamp: metav1.Now(), - SidecarSetHash: GetSidecarSetRevision(sidecarSet), - SidecarSetName: sidecarSet.Name, - SidecarList: sidecarList.List(), + UpdateTimestamp: metav1.Now(), + SidecarSetHash: GetSidecarSetRevision(sidecarSet), + SidecarSetName: sidecarSet.Name, + SidecarList: sidecarList.List(), + SidecarSetControllerRevision: sidecarSet.Status.LatestRevision, } newHash, _ := json.Marshal(sidecarSetHash) pod.Annotations[hashKey] = string(newHash) diff --git a/pkg/controller/sidecarset/sidecarset_controller.go b/pkg/controller/sidecarset/sidecarset_controller.go index b5b0ecec6d..a84407a3fa 100644 --- a/pkg/controller/sidecarset/sidecarset_controller.go +++ b/pkg/controller/sidecarset/sidecarset_controller.go @@ -122,6 +122,7 @@ type ReconcileSidecarSet struct { // +kubebuilder:rbac:groups=apps.kruise.io,resources=sidecarsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps.kruise.io,resources=sidecarsets/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete // Reconcile reads that state of the cluster for a SidecarSet object and makes changes based on the state read // and what is in the SidecarSet.Spec diff --git a/pkg/controller/sidecarset/sidecarset_controller_test.go b/pkg/controller/sidecarset/sidecarset_controller_test.go index 7d9e788cc0..7a3c2448f2 100644 --- a/pkg/controller/sidecarset/sidecarset_controller_test.go +++ b/pkg/controller/sidecarset/sidecarset_controller_test.go @@ -8,12 +8,15 @@ import ( "github.com/openkruise/kruise/pkg/control/sidecarcontrol" "github.com/openkruise/kruise/pkg/util/expectations" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" + utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -51,6 +54,7 @@ var ( //Partition: &partition, //MaxUnavailable: &maxUnavailable, }, + RevisionHistoryLimit: utilpointer.Int32Ptr(10), }, } @@ -127,6 +131,9 @@ var ( func init() { scheme = runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + _ = appsv1alpha1.AddToScheme(clientgoscheme.Scheme) + _ = appsv1.AddToScheme(scheme) _ = appsv1alpha1.AddToScheme(scheme) _ = corev1.AddToScheme(scheme) } diff --git a/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go b/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go index a482a668f2..eeb2e55cbe 100644 --- a/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go +++ b/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" + utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -61,7 +62,8 @@ var ( Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"app": "nginx"}, }, - UpdateStrategy: appsv1alpha1.SidecarSetUpdateStrategy{}, + UpdateStrategy: appsv1alpha1.SidecarSetUpdateStrategy{}, + RevisionHistoryLimit: utilpointer.Int32Ptr(10), }, } diff --git a/pkg/controller/sidecarset/sidecarset_processor.go b/pkg/controller/sidecarset/sidecarset_processor.go index efebe3fc27..812a358962 100644 --- a/pkg/controller/sidecarset/sidecarset_processor.go +++ b/pkg/controller/sidecarset/sidecarset_processor.go @@ -27,13 +27,21 @@ import ( "github.com/openkruise/kruise/pkg/control/sidecarcontrol" "github.com/openkruise/kruise/pkg/util" "github.com/openkruise/kruise/pkg/util/expectations" + historyutil "github.com/openkruise/kruise/pkg/util/history" + webhookutil "github.com/openkruise/kruise/pkg/webhook/util" + apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller/history" + "k8s.io/utils/integer" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -41,6 +49,7 @@ import ( type Processor struct { Client client.Client recorder record.EventRecorder + historyController history.Interface updateExpectations expectations.UpdateExpectations } @@ -49,6 +58,7 @@ func NewSidecarSetProcessor(cli client.Client, expectations expectations.UpdateE Client: cli, updateExpectations: expectations, recorder: rec, + historyController: historyutil.NewHistory(cli), } } @@ -65,12 +75,21 @@ func (p *Processor) UpdateSidecarSet(sidecarSet *appsv1alpha1.SidecarSet) (recon return reconcile.Result{}, err } - // 2. calculate SidecarSet status based on pods - status := calculateStatus(control, pods) + // register new revision if this sidecarSet is the latest; + // return the latest revision that corresponds to this sidecarSet. + latestRevision, collisionCount, err := p.registerLatestRevision(sidecarSet, pods) + if latestRevision == nil { + klog.Errorf("sidecarSet register the latest revision error, err: %v, name: %s", err, sidecarSet.Name) + return reconcile.Result{}, err + } + + // 2. calculate SidecarSet status based on pod and revision information + status := calculateStatus(control, pods, latestRevision, collisionCount) //update sidecarSet status in store if err := p.updateSidecarSetStatus(sidecarSet, status); err != nil { return reconcile.Result{}, err } + sidecarSet.Status = *status // in case of informer cache latency for _, pod := range pods { @@ -285,13 +304,151 @@ func (p *Processor) getSelectedPods(namespaces []string, selector labels.Selecto return } +func (p *Processor) registerLatestRevision(sidecarSet *appsv1alpha1.SidecarSet, pods []*corev1.Pod) ( + latestRevision *apps.ControllerRevision, collisionCount int32, err error, +) { + // get revision selector of this sidecarSet + hc := sidecarcontrol.NewHistoryControl(p.Client) + selector, err := util.GetFastLabelSelector( + hc.GetRevisionLabelSelector(sidecarSet), + ) + if err != nil { + klog.Errorf("Failed to convert labels to selector, err %v, name %v", err, sidecarSet.Name) + return nil, collisionCount, nil + } + + // list all revisions + revisions, err := p.historyController.ListControllerRevisions(sidecarSet, selector) + if err != nil { + klog.Errorf("Failed to list history controllerRevisions, err %v, name %v", err, sidecarSet.Name) + return nil, collisionCount, err + } + + // sort revisions by increasing .Revision + history.SortControllerRevisions(revisions) + revisionCount := len(revisions) + + if sidecarSet.Status.CollisionCount != nil { + collisionCount = *sidecarSet.Status.CollisionCount + } + + // build a new revision from the current sidecarSet, + // the namespace of sidecarset revision must have very strict permissions for average users. + // Here use the namespace of kruise-manager. + latestRevision, err = hc.NewRevision(sidecarSet, webhookutil.GetNamespace(), hc.NextRevision(revisions), &collisionCount) + if err != nil { + return nil, collisionCount, err + } + + // find any equivalent revisions + equalRevisions := history.FindEqualRevisions(revisions, latestRevision) + equalCount := len(equalRevisions) + + if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) { + // if the equivalent revision is immediately prior the update revision has not changed + // in case of no change + latestRevision = revisions[revisionCount-1] + } else if equalCount > 0 { + // if the equivalent revision is not immediately prior we will roll back by incrementing the + // Revision of the equivalent revision + // in case of roll back + latestRevision, err = p.historyController.UpdateControllerRevision(equalRevisions[equalCount-1], latestRevision.Revision) + if err != nil { + return nil, collisionCount, err + } + // the updated revision may be deleted if we don't replace this revision. + replaceRevision(revisions, equalRevisions[equalCount-1], latestRevision) + } else { + // if there is no equivalent revision we create a new one + // in case of the sidecarSet update + latestRevision, err = hc.CreateControllerRevision(sidecarSet, latestRevision, &collisionCount) + if err != nil { + return nil, collisionCount, err + } + revisions = append(revisions, latestRevision) + } + + // only store limited history revisions + if err = p.truncateHistory(revisions, sidecarSet, pods); err != nil { + klog.Errorf("Failed to truncate history for %s: err: %v", sidecarSet.Name, err) + } + + return latestRevision, collisionCount, nil +} + +func (p *Processor) truncateHistory(revisions []*apps.ControllerRevision, s *appsv1alpha1.SidecarSet, pods []*corev1.Pod) error { + // We do not delete the latest revision because we are using it. + // Thus, we must ensure the limitation is bounded, minimum value is 1. + limitation := 10 + if s.Spec.RevisionHistoryLimit != nil { + limitation = integer.IntMax(1, int(*s.Spec.RevisionHistoryLimit)) + } + // if no need to truncate + revisionCount := len(revisions) + if revisionCount <= limitation { + return nil + } + + klog.V(3).Infof("Find %v revisions more than limitation %v, name: %v", revisionCount, limitation, s.Name) + + // the number of revisions need to delete + deletionCount := revisionCount - limitation + // only delete the revisions that no pods use. + activeRevisions := filterActiveRevisions(s, pods) + for i := 0; i < revisionCount-1 && deletionCount > 0; i++ { + if !activeRevisions.Has(revisions[i].Name) { // && revision.InjectionStrategy.ControllerRevision != revisions[i].Name + if err := p.historyController.DeleteControllerRevision(revisions[i]); err != nil && !errors.IsNotFound(err) { + return err + } + deletionCount-- + } + } + + // Sometime we cannot ensure the number of stored revisions is within the limitation because of the use by pods. + if deletionCount > 0 { + return fmt.Errorf("failed to limit the number of stored revisions, limited: %d, actual: %d, name: %s", limitation, limitation+deletionCount, s.Name) + } + return nil +} + +func filterActiveRevisions(s *appsv1alpha1.SidecarSet, pods []*corev1.Pod) sets.String { + activeRevisions := sets.NewString() + for _, pod := range pods { + if revision := sidecarcontrol.GetPodSidecarSetControllerRevision(s.Name, pod); revision != "" { + activeRevisions.Insert(revision) + } + } + return activeRevisions +} + +// replaceRevision will remove old from revisions, and add new to the end of revisions. +// This function keeps the order of revisions. +func replaceRevision(revisions []*apps.ControllerRevision, oldOne, newOne *apps.ControllerRevision) { + revisionCount := len(revisions) + if revisionCount == 0 || oldOne == nil { + return + } + // remove old revision from revisions + found := revisions[0] == oldOne + for i := 0; i < revisionCount-1; i++ { + if found { + revisions[i] = revisions[i+1] + } else if revisions[i+1] == oldOne { + found = true + } + } + // add this new revision to the end of revisions + revisions[revisionCount-1] = newOne +} + // calculate SidecarSet status // MatchedPods: all matched pods number // UpdatedPods: updated pods number // ReadyPods: ready pods number // UpdatedReadyPods: updated and ready pods number // UnavailablePods: MatchedPods - UpdatedReadyPods -func calculateStatus(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) *appsv1alpha1.SidecarSetStatus { +func calculateStatus(control sidecarcontrol.SidecarControl, pods []*corev1.Pod, latestRevision *apps.ControllerRevision, collisionCount int32, +) *appsv1alpha1.SidecarSetStatus { sidecarset := control.GetSidecarset() var matchedPods, updatedPods, readyPods, updatedAndReady int32 matchedPods = int32(len(pods)) @@ -313,6 +470,8 @@ func calculateStatus(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) UpdatedPods: updatedPods, ReadyPods: readyPods, UpdatedReadyPods: updatedAndReady, + LatestRevision: latestRevision.Name, + CollisionCount: pointer.Int32Ptr(collisionCount), } } @@ -393,7 +552,9 @@ func inconsistentStatus(sidecarSet *appsv1alpha1.SidecarSet, status *appsv1alpha status.MatchedPods != sidecarSet.Status.MatchedPods || status.UpdatedPods != sidecarSet.Status.UpdatedPods || status.ReadyPods != sidecarSet.Status.ReadyPods || - status.UpdatedReadyPods != sidecarSet.Status.UpdatedReadyPods + status.UpdatedReadyPods != sidecarSet.Status.UpdatedReadyPods || + status.LatestRevision != sidecarSet.Status.LatestRevision || + status.CollisionCount != sidecarSet.Status.CollisionCount } func isSidecarSetUpdateFinish(status *appsv1alpha1.SidecarSetStatus) bool { diff --git a/pkg/controller/sidecarset/sidecarset_processor_test.go b/pkg/controller/sidecarset/sidecarset_processor_test.go index 71c0b08f60..7f4d2b01a7 100644 --- a/pkg/controller/sidecarset/sidecarset_processor_test.go +++ b/pkg/controller/sidecarset/sidecarset_processor_test.go @@ -18,17 +18,22 @@ package sidecarset import ( "context" + "encoding/json" "fmt" + "strconv" "testing" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/control/sidecarcontrol" "github.com/openkruise/kruise/pkg/util" "github.com/openkruise/kruise/pkg/util/expectations" + webhookutil "github.com/openkruise/kruise/pkg/webhook/util" + apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/controller/history" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -304,3 +309,215 @@ func TestCanUpgradePods(t *testing.T) { } } } + +func TestGetActiveRevisions(t *testing.T) { + sidecarSet := factorySidecarSet() + sidecarSet.SetUID("1223344") + kubeSysNs := &corev1.Namespace{} + //Note that webhookutil.GetNamespace() return "" here + kubeSysNs.SetName(webhookutil.GetNamespace()) + kubeSysNs.SetNamespace(webhookutil.GetNamespace()) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet, kubeSysNs).Build() + exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) + processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + + // case 1 + latestRevision, _, err := processor.registerLatestRevision(sidecarSet, nil) + if err != nil || latestRevision == nil || latestRevision.Revision != int64(1) { + t.Fatalf("in case of create: get active revision failed when the latest revision = 1, err: %v, actual latestRevision: %v", + err, latestRevision.Revision) + } + + // case 2 + newSidecar := sidecarSet.DeepCopy() + newSidecar.Spec.InitContainers = []appsv1alpha1.SidecarContainer{ + {Container: corev1.Container{Name: "emptyInitC"}}, + } + newSidecar.Spec.Volumes = []corev1.Volume{ + {Name: "emptyVolume"}, + } + newSidecar.Spec.ImagePullSecrets = []corev1.LocalObjectReference{ + {Name: "emptySecret"}, + } + for i := 0; i < 5; i++ { + latestRevision, _, err = processor.registerLatestRevision(newSidecar, nil) + if err != nil || latestRevision == nil || latestRevision.Revision != int64(2) { + t.Fatalf("in case of update: get active revision failed when the latest revision = 2, err: %v, actual latestRevision: %v", + err, latestRevision.Revision) + } + revision := make(map[string]interface{}) + if err := json.Unmarshal(latestRevision.Data.Raw, &revision); err != nil { + t.Fatalf("failed to decode revision, err: %v", err) + } + spec := revision["spec"].(map[string]interface{}) + _, ok1 := spec["volumes"] + _, ok2 := spec["containers"] + _, ok3 := spec["initContainers"] + _, ok4 := spec["imagePullSecrets"] + if !(ok1 && ok2 && ok3 && ok4) { + t.Fatalf("failed to store revision, err: %v", err) + } + } + + // case 3 + for i := 0; i < 5; i++ { + latestRevision, _, err = processor.registerLatestRevision(sidecarSet, nil) + if err != nil || latestRevision == nil || latestRevision.Revision != int64(3) { + t.Fatalf("in case of rollback: get active revision failed when the latest revision = 3, err: %v, actual latestRevision: %v", + err, latestRevision.Revision) + } + } + + // case 4 + for i := 0; i < 100; i++ { + sidecarSet.Spec.Containers[0].Image = fmt.Sprintf("%d", i) + if _, _, err = processor.registerLatestRevision(sidecarSet, nil); err != nil { + t.Fatalf("unexpected error, err: %v", err) + } + } + revisionList := &apps.ControllerRevisionList{} + processor.Client.List(context.TODO(), revisionList) + if len(revisionList.Items) != int(*sidecarSet.Spec.RevisionHistoryLimit) { + t.Fatalf("in case of maxStoredRevisions: get wrong number of revisions, expected %d, actual %d", *sidecarSet.Spec.RevisionHistoryLimit, len(revisionList.Items)) + } +} + +func TestReplaceRevision(t *testing.T) { + const TotalRevisions int = 10 + var revisions, pick []*apps.ControllerRevision + // init revision slice + for i := 1; i <= TotalRevisions; i++ { + rv := &apps.ControllerRevision{} + rv.Revision = int64(i) + pick = append(pick, rv) + } + + // check whether the list is ordered + check := func(list []*apps.ControllerRevision) bool { + if len(list) != TotalRevisions { + return false + } + for i := 1; i < TotalRevisions; i++ { + if list[i].Revision < list[i-1].Revision { + return false + } + } + return true + } + + // reset revisions + reset := func() { + revisions = make([]*apps.ControllerRevision, 0) + revisions = append(revisions, pick...) + } + + newOne := &apps.ControllerRevision{} + newOne.Revision = int64(TotalRevisions + 1) + for i := 0; i < TotalRevisions; i++ { + reset() + replaceRevision(revisions, pick[i], newOne) + if !check(revisions) { + t.Fatalf("replaceRevision failed when replacing the %d-th item of %d", i, TotalRevisions) + } + } +} + +func TestTruncateHistory(t *testing.T) { + sidecarSet := factorySidecarSet() + sidecarSet.SetName("sidecar") + sidecarSet.SetUID("1223344") + if sidecarSet.Spec.Selector.MatchLabels == nil { + sidecarSet.Spec.Selector.MatchLabels = make(map[string]string) + } + kubeSysNs := &corev1.Namespace{} + kubeSysNs.SetName(webhookutil.GetNamespace()) //Note that util.GetKruiseManagerNamespace() return "" here + kubeSysNs.SetNamespace(webhookutil.GetNamespace()) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet, kubeSysNs).Build() + exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) + processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + + getName := func(i int) string { + return "sidecar-" + strconv.Itoa(i) + } + reset := func(usedCount int) ([]*corev1.Pod, []*apps.ControllerRevision) { + pods := factoryPodsCommon(100, 0, sidecarSet) + for i := range pods { + sidecarSetHash := make(map[string]sidecarcontrol.SidecarSetUpgradeSpec) + sidecarSetHash[sidecarSet.Name] = sidecarcontrol.SidecarSetUpgradeSpec{ + SidecarSetControllerRevision: getName(i%usedCount + 1), + } + by, _ := json.Marshal(&sidecarSetHash) + pods[i].Annotations[sidecarcontrol.SidecarSetHashAnnotation] = string(by) + } + revisions := make([]*apps.ControllerRevision, 0) + for i := 1; i <= 15; i++ { + rv := &apps.ControllerRevision{} + rv.SetName(getName(i)) + rv.SetNamespace(webhookutil.GetNamespace()) + rv.Revision = int64(i) + revisions = append(revisions, rv) + } + return pods, revisions + } + + stderr := func(num int) string { + return fmt.Sprintf("failed to limit the number of stored revisions, limited: 10, actual: %d, name: sidecar", num) + } + + // check successful cases + for i := 1; i <= 9; i++ { + pods, revisions := reset(i) + err := processor.truncateHistory(revisions, sidecarSet, pods) + if err != nil { + t.Fatalf("expected revision len: %d, err: %v", 10, err) + } + } + + // check failed cases + failedCases := []int{10, 11, 14, 15, 20} + expectedResults := []int{11, 12, 15, 15, 15} + for i := range failedCases { + pods, revisions := reset(failedCases[i]) + err := processor.truncateHistory(revisions, sidecarSet, pods) + if err == nil || err.Error() != stderr(expectedResults[i]) { + t.Fatalf("expected revision len: %d, err: %v", expectedResults[i], err) + } + } + + // check revisions exactly + pods, revisions := reset(8) + for _, rv := range revisions { + if err := processor.Client.Create(context.TODO(), rv); err != nil { + t.Fatalf("failed to create revisions") + } + } + if err := processor.truncateHistory(revisions, sidecarSet, pods); err != nil { + t.Fatalf("failed to truncate revisions, err %v", err) + } + list := &apps.ControllerRevisionList{} + if err := processor.Client.List(context.TODO(), list); err != nil { + t.Fatalf("failed to list revisions, err %v", err) + } + if len(list.Items) != 10 { + t.Fatalf("expected revision len: %d, actual: %d", 10, len(list.Items)) + } + // sort history by revision field + rvs := make([]*apps.ControllerRevision, 0) + for i := range list.Items { + rvs = append(rvs, &list.Items[i]) + } + history.SortControllerRevisions(rvs) + + expected := make(map[int]int) + for i := 0; i < 8; i++ { + if rvs[i].Name != getName(i+1) { + t.Fatalf("expected name %s, actual : %s", getName(expected[i]), rvs[i].Name) + } + } + if rvs[8].Name != getName(14) { + t.Fatalf("expected name %s, actual : %s", getName(14), rvs[8].Name) + } + if rvs[9].Name != getName(15) { + t.Fatalf("expected name %s, actual : %s", getName(15), rvs[9].Name) + } +} diff --git a/pkg/controller/sidecarset/sidecarset_strategy_test.go b/pkg/controller/sidecarset/sidecarset_strategy_test.go index e18a8e9773..2f4469dd99 100644 --- a/pkg/controller/sidecarset/sidecarset_strategy_test.go +++ b/pkg/controller/sidecarset/sidecarset_strategy_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + utilpointer "k8s.io/utils/pointer" ) type FactorySidecarSet func() *appsv1alpha1.SidecarSet @@ -132,6 +133,7 @@ func factorySidecarSet() *appsv1alpha1.SidecarSet { UpdateStrategy: appsv1alpha1.SidecarSetUpdateStrategy{ //Type: appsv1alpha1.RollingUpdateSidecarSetStrategyType, }, + RevisionHistoryLimit: utilpointer.Int32Ptr(10), }, } diff --git a/test/e2e/apps/sidecarset.go b/test/e2e/apps/sidecarset.go index d294b514dd..701fffc726 100644 --- a/test/e2e/apps/sidecarset.go +++ b/test/e2e/apps/sidecarset.go @@ -18,6 +18,7 @@ package apps import ( "context" + "encoding/json" "fmt" "reflect" "time" @@ -36,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/controller/history" utilpointer "k8s.io/utils/pointer" ) @@ -955,5 +957,70 @@ var _ = SIGDescribe("SidecarSet", func() { gomega.Expect(hash1).To(gomega.Equal(hash2)) ginkgo.By(fmt.Sprintf("sidecarSet upgrade init sidecar container, and don't upgrade done")) }) + + ginkgo.It("sidecarSet history revision checker", func() { + // check function + revisionChecker := func(s *appsv1alpha1.SidecarSet, expectedCount int, expectedOrder []int64) { + list := tester.ListControllerRevisions(s) + // check the number of revisions + gomega.Expect(list).To(gomega.HaveLen(expectedCount)) + for _, revision := range list { + // check fields of revision + mice := make(map[string]interface{}) + err := json.Unmarshal(revision.Data.Raw, &mice) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + spec := mice["spec"].(map[string]interface{}) + _, ok1 := spec["volumes"] + _, ok2 := spec["containers"] + _, ok3 := spec["initContainers"] + _, ok4 := spec["imagePullSecrets"] + gomega.Expect(ok1 && ok2 && ok3 && ok4).To(gomega.BeTrue()) + } + if expectedOrder == nil { + return + } + gomega.Expect(list).To(gomega.HaveLen(len(expectedOrder))) + history.SortControllerRevisions(list) + for i := range list { + gomega.Expect(list[i].Revision).To(gomega.Equal(expectedOrder[i])) + } + } + + waitingForSidecarSetReconcile := func(name string) { + gomega.Eventually(func() bool { + sidecarSet, err := kc.AppsV1alpha1().SidecarSets().Get(context.TODO(), name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return sidecarSet.Status.ObservedGeneration == sidecarSet.Generation + }, 10*time.Second, time.Second).Should(gomega.BeTrue()) + } + + ginkgo.By("check after sidecarset creating...") + sidecarSetIn := tester.NewBaseSidecarSet(ns) + sidecarSetIn.SetName("e2e-test-for-history-revisions") + sidecarSetIn.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "secret-1"}} + ginkgo.By(fmt.Sprintf("Creating SidecarSet %s", sidecarSetIn.Name)) + sidecarSetIn = tester.CreateSidecarSet(sidecarSetIn) + waitingForSidecarSetReconcile(sidecarSetIn.Name) + revisionChecker(sidecarSetIn, 1, nil) + + // update sidecarSet and stored revisions + ginkgo.By("check after sidecarset updating 15 times...") + for i := 2; i <= 15; i++ { + sidecarSetIn.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: fmt.Sprintf("secret-%d", i)}} + tester.UpdateSidecarSet(sidecarSetIn) + waitingForSidecarSetReconcile(sidecarSetIn.Name) + } + // expected order after update + expectedOrder := []int64{6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + revisionChecker(sidecarSetIn, 10, expectedOrder) + + ginkgo.By("check after sidecarset updating by using old revision...") + sidecarSetIn.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: fmt.Sprintf("secret-%d", 12)}} + tester.UpdateSidecarSet(sidecarSetIn) + waitingForSidecarSetReconcile(sidecarSetIn.Name) + expectedOrder = []int64{6, 7, 8, 9, 10, 11, 13, 14, 15, 16} + revisionChecker(sidecarSetIn, 10, expectedOrder) + ginkgo.By(fmt.Sprintf("sidecarSet history revision check done")) + }) }) }) diff --git a/test/e2e/framework/sidecarset_utils.go b/test/e2e/framework/sidecarset_utils.go index 424b53be0f..b8ca38d0cb 100644 --- a/test/e2e/framework/sidecarset_utils.go +++ b/test/e2e/framework/sidecarset_utils.go @@ -22,7 +22,9 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/control/sidecarcontrol" "github.com/openkruise/kruise/pkg/util" + webhookutil "github.com/openkruise/kruise/pkg/webhook/util" "github.com/onsi/gomega" apps "k8s.io/api/apps/v1" @@ -403,3 +405,17 @@ func (t *SidecarSetTester) WaitForCloneSetRunning(cloneset *appsv1alpha1.CloneSe Failf("Failed waiting for cloneset to enter running: %v", pollErr) } } + +func (t *SidecarSetTester) ListControllerRevisions(sidecarSet *appsv1alpha1.SidecarSet) []*apps.ControllerRevision { + selector, err := util.GetFastLabelSelector(&metav1.LabelSelector{MatchLabels: map[string]string{ + sidecarcontrol.SidecarSetKindName: sidecarSet.Name, + }}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + revisionList, err := t.c.AppsV1().ControllerRevisions(webhookutil.GetNamespace()).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + revisions := make([]*apps.ControllerRevision, len(revisionList.Items)) + for i := range revisionList.Items { + revisions[i] = &revisionList.Items[i] + } + return revisions +}