Skip to content

Commit

Permalink
Optimize inplace update and support pod-template-hash label for clone…
Browse files Browse the repository at this point in the history
…set (#931)

Signed-off-by: FillZpp <[email protected]>
  • Loading branch information
FillZpp authored Mar 21, 2022
1 parent 02f8fe6 commit 3b0a040
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 107 deletions.
44 changes: 13 additions & 31 deletions pkg/controller/cloneset/cloneset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
} else {
klog.Errorf("Failed syncing CloneSet %s: %v", request, retErr)
}
// clean the duration store
_ = clonesetutils.DurationStore.Pop(request.String())
}()

// Fetch the CloneSet instance
Expand All @@ -202,7 +204,6 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
// For additional cleanup logic use finalizers.
klog.V(3).Infof("CloneSet %s has been deleted.", request)
clonesetutils.ScaleExpectations.DeleteExpectations(request.String())
clonesetutils.UpdateExpectations.DeleteExpectations(request.String())
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
Expand Down Expand Up @@ -256,20 +257,6 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
return reconcile.Result{}, err
}

// Refresh update expectations
for _, pod := range filteredPods {
clonesetutils.UpdateExpectations.ObserveUpdated(request.String(), updateRevision.Name, pod)
}
// If update expectations have not satisfied yet, just skip this reconcile.
if updateSatisfied, unsatisfiedDuration, updateDirtyPods := clonesetutils.UpdateExpectations.SatisfiedExpectations(request.String(), updateRevision.Name); !updateSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
klog.Warningf("Expectation unsatisfied overtime for %v, updateDirtyPods=%v, timeout=%v", request.String(), updateDirtyPods, unsatisfiedDuration)
return reconcile.Result{}, nil
}
klog.V(4).Infof("Not satisfied update for %v, updateDirtyPods=%v", request.String(), updateDirtyPods)
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}

// If resourceVersion expectations have not satisfied yet, just skip this reconcile
clonesetutils.ResourceVersionExpectations.Observe(updateRevision)
if isSatisfied, unsatisfiedDuration := clonesetutils.ResourceVersionExpectations.IsSatisfied(updateRevision); !isSatisfied {
Expand Down Expand Up @@ -331,7 +318,7 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
}

// scale and update pods
delayDuration, syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)

// update new status
if err = r.statusUpdater.UpdateCloneSetStatus(instance, &newStatus, filteredPods); err != nil {
Expand All @@ -347,32 +334,28 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
}

if syncErr == nil && instance.Spec.MinReadySeconds > 0 && newStatus.AvailableReplicas != newStatus.ReadyReplicas {
minReadyDuration := time.Second * time.Duration(instance.Spec.MinReadySeconds)
if delayDuration == 0 || minReadyDuration < delayDuration {
delayDuration = minReadyDuration
}
clonesetutils.DurationStore.Push(request.String(), time.Second*time.Duration(instance.Spec.MinReadySeconds))
}
return reconcile.Result{RequeueAfter: delayDuration}, syncErr
return reconcile.Result{RequeueAfter: clonesetutils.DurationStore.Pop(request.String())}, syncErr
}

func (r *ReconcileCloneSet) syncCloneSet(
instance *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
filteredPods []*v1.Pod, filteredPVCs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
var delayDuration time.Duration
) error {
if instance.DeletionTimestamp != nil {
return delayDuration, nil
return nil
}

// get the current and update revisions of the set.
currentSet, err := r.revisionControl.ApplyRevision(instance, currentRevision)
if err != nil {
return delayDuration, err
return err
}
updateSet, err := r.revisionControl.ApplyRevision(instance, updateRevision)
if err != nil {
return delayDuration, err
return err
}

var scaling bool
Expand All @@ -390,24 +373,23 @@ func (r *ReconcileCloneSet) syncCloneSet(
err = podsScaleErr
}
if scaling {
return delayDuration, podsScaleErr
return podsScaleErr
}

delayDuration, podsUpdateErr = r.syncControl.Update(updateSet, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
podsUpdateErr = r.syncControl.Update(updateSet, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
if podsUpdateErr != nil {
newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
Type: appsv1alpha1.CloneSetConditionFailedUpdate,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: podsUpdateErr.Error(),
})
// If these is a delay duration, need not to return error to outside
if err == nil && delayDuration <= 0 {
if err == nil {
err = podsUpdateErr
}
}

return delayDuration, err
return err
}

func (r *ReconcileCloneSet) getActiveRevisions(cs *appsv1alpha1.CloneSet, revisions []*apps.ControllerRevision) (
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/cloneset/cloneset_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func (e *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting

klog.V(4).Infof("Pod %s/%s deleted, owner: %s", pod.Namespace, pod.Name, req.Name)
clonesetutils.ScaleExpectations.ObserveScale(req.String(), expectations.Delete, pod.Name)
clonesetutils.UpdateExpectations.DeleteObject(req.String(), pod)
q.Add(*req)
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/cloneset/sync/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package sync

import (
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
Expand All @@ -41,7 +39,7 @@ type Interface interface {
Update(cs *appsv1alpha1.CloneSet,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error)
) error
}

type realControl struct {
Expand Down
27 changes: 15 additions & 12 deletions pkg/controller/cloneset/sync/cloneset_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func TestCreatePods(t *testing.T) {
Name: "foo-id1",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id1",
apps.ControllerRevisionHashLabelKey: "revision_abc",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
appsv1alpha1.CloneSetInstanceID: "id1",
apps.ControllerRevisionHashLabelKey: "revision_abc",
apps.DefaultDeploymentUniqueLabelKey: "revision_abc",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down Expand Up @@ -136,10 +137,11 @@ func TestCreatePods(t *testing.T) {
Name: "foo-id3",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id3",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
appsv1alpha1.CloneSetInstanceID: "id3",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
apps.DefaultDeploymentUniqueLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down Expand Up @@ -193,10 +195,11 @@ func TestCreatePods(t *testing.T) {
Name: "foo-id4",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id4",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
appsv1alpha1.CloneSetInstanceID: "id4",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
apps.DefaultDeploymentUniqueLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down
58 changes: 41 additions & 17 deletions pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sync

import (
"context"
"fmt"
"sort"
"time"
Expand All @@ -32,48 +33,54 @@ import (
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/requeueduration"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
"github.com/openkruise/kruise/pkg/util/updatesort"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
) error {

requeueDuration := requeueduration.Duration{}
key := clonesetutils.GetControllerKey(cs)
coreControl := clonesetcore.New(cs)

// 1. refresh states for all pods
var modified bool
for _, pod := range pods {
patched, duration, err := c.refreshPodState(cs, coreControl, pod)
patchedState, duration, err := c.refreshPodState(cs, coreControl, pod)
if err != nil {
return 0, err
return err
} else if duration > 0 {
requeueDuration.Update(duration)
clonesetutils.DurationStore.Push(key, duration)
}
// fix the pod-template-hash label for old pods before v1.1
patchedHash, err := c.fixPodTemplateHashLabel(cs, pod)
if err != nil {
return err
}
if patched {
if patchedState || patchedHash {
modified = true
}
}
if modified {
return requeueDuration.Get(), nil
return nil
}

if cs.Spec.UpdateStrategy.Paused {
return requeueDuration.Get(), nil
return nil
}

// 2. calculate update diff and the revision to update
diffRes := calculateDiffsWithExpectation(cs, pods, currentRevision.Name, updateRevision.Name)
if diffRes.updateNum == 0 {
return requeueDuration.Get(), nil
return nil
}

// 3. find all matched pods can update
Expand Down Expand Up @@ -128,7 +135,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) && len(waitUpdateIndexes) > 0 {
pub, err = pubcontrol.GetPodUnavailableBudgetForPod(c.Client, c.controllerFinder, pods[waitUpdateIndexes[0]])
if err != nil {
return requeueDuration.Get(), err
return err
}
}
// 6. update pods
Expand All @@ -138,22 +145,23 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
if pub != nil {
allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(c.Client, pod, pubcontrol.NewPubControl(pub, c.controllerFinder, c.Client), pubcontrol.UpdateOperation, false)
if err != nil {
return requeueDuration.Get(), err
return err
// pub check does not pass, try again in seconds
} else if !allowed {
return time.Second, nil
clonesetutils.DurationStore.Push(key, time.Second)
return nil
}
}
duration, err := c.updatePod(cs, coreControl, targetRevision, revisions, pod, pvcs)
if duration > 0 {
requeueDuration.Update(duration)
clonesetutils.DurationStore.Push(key, duration)
}
if err != nil {
return requeueDuration.Get(), err
return err
}
}

return requeueDuration.Get(), nil
return nil
}

func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control, pod *v1.Pod) (bool, time.Duration, error) {
Expand Down Expand Up @@ -198,6 +206,23 @@ func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clo
return false, res.DelayDuration, nil
}

// fix the pod-template-hash label for old pods before v1.1
func (c *realControl) fixPodTemplateHashLabel(cs *appsv1alpha1.CloneSet, pod *v1.Pod) (bool, error) {
if _, exists := pod.Labels[apps.DefaultDeploymentUniqueLabelKey]; exists {
return false, nil
}
patch := []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`,
apps.DefaultDeploymentUniqueLabelKey,
clonesetutils.GetShortHash(pod.Labels[apps.ControllerRevisionHashLabelKey])))
pod = pod.DeepCopy()
if err := c.Patch(context.TODO(), pod, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil {
klog.Warningf("CloneSet %s/%s failed to fix pod-template-hash to Pod %s: %v", cs.Namespace, cs.Name, pod.Name, err)
return false, err
}
clonesetutils.ResourceVersionExpectations.Expect(pod)
return true, nil
}

func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,
updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,
Expand Down Expand Up @@ -257,7 +282,6 @@ func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetc
if res.UpdateErr == nil {
c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place(revision %v)", pod.Name, updateRevision.Name)
clonesetutils.ResourceVersionExpectations.Expect(&metav1.ObjectMeta{UID: pod.UID, ResourceVersion: res.NewResourceVersion})
clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
return res.DelayDuration, nil
}

Expand Down
Loading

0 comments on commit 3b0a040

Please sign in to comment.