Skip to content

Commit

Permalink
support bluegreen release: webhook update
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <[email protected]>
  • Loading branch information
Funinu committed Dec 4, 2024
1 parent e8f7f0c commit b8bd5b5
Show file tree
Hide file tree
Showing 10 changed files with 469 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,6 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error()))
}
patchData := patch.NewClonesetPatch()
// why we need a simple MinReadySeconds-based status machine? (ie. the if-else block)
// It's possible for Finalize to be called multiple times, if error returned is not nil.
// if we do all needed operations in a single code block, like, A->B->C, when C need retry,
// both A and B will be executed as well, however, operations like restoreHPA cost a lot(which calls LIST API)
if rc.object.Spec.MinReadySeconds != setting.MinReadySeconds {
// restore the hpa
if err := hpa.RestoreHPA(rc.client, rc.object); err != nil {
Expand All @@ -170,21 +166,18 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
if err := rc.client.Patch(context.TODO(), c, patchData); err != nil {
return err
}
// we should return an error to trigger re-enqueue, so that we can go to the next if-else branch in the next reconcile
return errors.NewBenignError(fmt.Errorf("cloneset bluegreen: we should wait all pods updated and available"))
} else {
klog.InfoS("Finalize: cloneset bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
// wait all pods updated and ready
if rc.object.Status.ReadyReplicas != rc.object.Status.UpdatedReadyReplicas {
return errors.NewBenignError(fmt.Errorf("cloneset %v finalize not done, readyReplicas %d != updatedReadyReplicas %d, current policy %s",
klog.KObj(rc.object), rc.object.Status.ReadyReplicas, rc.object.Status.UpdatedReadyReplicas, release.Spec.ReleasePlan.FinalizingPolicy))
}
klog.InfoS("Finalize: cloneset bluegreen release: all pods updated and ready")
// restore annotation
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
return rc.client.Patch(context.TODO(), c, patchData)
}
klog.InfoS("Finalize: cloneset bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
// wait all pods updated and ready
if rc.object.Status.ReadyReplicas != rc.object.Status.UpdatedReadyReplicas {
return errors.NewBenignError(fmt.Errorf("cloneset %v finalize not done, readyReplicas %d != updatedReadyReplicas %d, current policy %s",
klog.KObj(rc.object), rc.object.Status.ReadyReplicas, rc.object.Status.UpdatedReadyReplicas, release.Spec.ReleasePlan.FinalizingPolicy))
}
klog.InfoS("Finalize: cloneset bluegreen release: all pods updated and ready")
// restore annotation
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
return rc.client.Patch(context.TODO(), c, patchData)
}

func (rc *realController) finalized() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ type realController struct {
pods []*corev1.Pod
key types.NamespacedName
object *apps.Deployment
finder *util.ControllerFinder
}

func NewController(cli client.Client, key types.NamespacedName, _ schema.GroupVersionKind) bluegreenstyle.Interface {
return &realController{
key: key,
client: cli,
finder: util.NewControllerFinder(cli),
}
}

Expand Down Expand Up @@ -82,39 +84,29 @@ func (rc *realController) ListOwnedPods() ([]*corev1.Pod, error) {
return rc.pods, err
}

// Add OriginalDeploymentStrategyAnnotation to workload
// Initialize prepares the Deployment for the BatchRelease process
func (rc *realController) Initialize(release *v1beta1.BatchRelease) error {
if rc.object == nil || control.IsControlledByBatchRelease(release, rc.object) {
return nil
}
// disable the hpa
// Disable the HPA
if err := hpa.DisableHPA(rc.client, rc.object); err != nil {
return err
}
klog.InfoS("Initialize: disable hpa for deployment successfully", "deployment", klog.KObj(rc.object))
// update the deployment
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error()))
klog.InfoS("Initialize: disabled HPA for deployment successfully", "deployment", klog.KObj(rc.object))

// Patch minReadySeconds for stable ReplicaSet
if err := rc.patchStableRSMinReadySeconds(v1beta1.MaxReadySeconds); err != nil {
return err
}
control.InitOriginalSetting(&setting, rc.object)
klog.InfoS("Initialize deployment", "deployment", klog.KObj(rc.object), "setting", util.DumpJSON(&setting))
klog.InfoS("Initialize: patched minReadySeconds for stable replicaset successfully", "deployment", klog.KObj(rc.object))

patchData := patch.NewDeploymentPatch()
patchData.InsertAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation, util.DumpJSON(&setting))
patchData.InsertAnnotation(util.BatchReleaseControlAnnotation, util.DumpJSON(metav1.NewControllerRef(
release, release.GetObjectKind().GroupVersionKind())))
// update: MinReadySeconds, ProgressDeadlineSeconds, MaxSurge, MaxUnavailable
patchData.UpdateStrategy(apps.DeploymentStrategy{
Type: apps.RollingUpdateDeploymentStrategyType,
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0},
},
})
patchData.UpdateMinReadySeconds(v1beta1.MaxReadySeconds)
patchData.UpdateProgressDeadlineSeconds(utilpointer.Int32(v1beta1.MaxProgressSeconds))
return rc.client.Patch(context.TODO(), util.GetEmptyObjectWithKey(rc.object), patchData)
// Patch Deplopyment
if err := rc.patchDeployment(release); err != nil {
return err
}
klog.InfoS("Initialize: patched deployment successfully", "deployment", klog.KObj(rc.object))
return nil
}

func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
Expand All @@ -131,6 +123,7 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
klog.Infof("Ready to upgrade batch for deployment %v: current %d < desired %d", klog.KObj(rc.object), current, desired)
patchData := patch.NewDeploymentPatch()
// different with canary release, bluegreen don't need to set pause in the process of rollout
// because our webhook may pause the Deployment in some situations, we ensure that the Deployment is not paused
patchData.UpdatePaused(false)
patchData.UpdateStrategy(apps.DeploymentStrategy{
Type: apps.RollingUpdateDeploymentStrategyType,
Expand Down Expand Up @@ -166,10 +159,6 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for cloneset %v: %s from annotation", klog.KObj(rc.object), err.Error()))
}
patchData := patch.NewDeploymentPatch()
// why we need a simple MinReadySeconds-based status machine? (ie. the if-else block)
// It's possible for Finalize to be called multiple times, if error returned is not nil.
// if we do all needed operations in a single code block, like, A->B->C, when C need retry,
// both A and B will be executed as well, however, operations like restoreHPA cost a lot(which calls LIST API)
if rc.object.Spec.MinReadySeconds != setting.MinReadySeconds {
// restore the hpa
if err := hpa.RestoreHPA(rc.client, rc.object); err != nil {
Expand All @@ -184,21 +173,18 @@ func (rc *realController) Finalize(release *v1beta1.BatchRelease) error {
if err := rc.client.Patch(context.TODO(), d, patchData); err != nil {
return err
}
// we should return an error to trigger re-enqueue, so that we can go to the next if-else branch in the next reconcile
return errors.NewBenignError(fmt.Errorf("deployment bluegreen: we should wait all pods updated and available"))
} else {
klog.InfoS("Finalize: deployment bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
// wait all pods updated and ready
if err := waitAllUpdatedAndReady(d.(*apps.Deployment)); err != nil {
return errors.NewBenignError(err)
}
klog.InfoS("Finalize: deployment is ready to resume, restore the original setting", "deployment", klog.KObj(rc.object))
// restore label and annotation
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteLabel(v1alpha1.DeploymentStableRevisionLabel)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
return rc.client.Patch(context.TODO(), d, patchData)
}
klog.InfoS("Finalize: deployment bluegreen release: wait all pods updated and ready", "cloneset", klog.KObj(rc.object))
// wait all pods updated and ready
if err := waitAllUpdatedAndReady(d.(*apps.Deployment)); err != nil {
return errors.NewBenignError(err)
}
klog.InfoS("Finalize: deployment is ready to resume, restore the original setting", "deployment", klog.KObj(rc.object))
// restore label and annotation
patchData.DeleteAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation)
patchData.DeleteLabel(v1alpha1.DeploymentStableRevisionLabel)
patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation)
return rc.client.Patch(context.TODO(), d, patchData)
}

func (rc *realController) finalized() bool {
Expand Down Expand Up @@ -301,3 +287,53 @@ func waitAllUpdatedAndReady(deployment *apps.Deployment) error {
}
return nil
}

// Patch minReadySeconds for stable ReplicaSet
/*
Here is why:
For rollback scenario, we should set the stable rs minReadySeconds to infinity to make pods of the stable rs unavailable,
otherwise Pods in new version would be terminated immediately when rollback happens.
we want to keep them until traffic is switched to the stable version
*/
func (rc *realController) patchStableRSMinReadySeconds(seconds int32) error {
if stableRS, err := rc.finder.GetDeploymentStableRs(rc.object); err != nil {
return fmt.Errorf("failed to get stable ReplicaSet: %v", err)
} else if stableRS == nil {
klog.Warningf("No stable ReplicaSet found for deployment %s/%s", rc.object.Namespace, rc.object.Name)
} else {
body := fmt.Sprintf(`{"spec":{"minReadySeconds":%v}}`, seconds)
if err = rc.client.Patch(context.TODO(), stableRS, client.RawPatch(types.MergePatchType, []byte(body))); err != nil {
return fmt.Errorf("failed to patch ReplicaSet %s/%s minReadySeconds to %v: %v", stableRS.Namespace, stableRS.Name, v1beta1.MaxReadySeconds, err)
}
}
return nil
}

// Update deployment strategy: MinReadySeconds, ProgressDeadlineSeconds, MaxSurge, MaxUnavailable
func (rc *realController) patchDeployment(release *v1beta1.BatchRelease) error {
setting, err := control.GetOriginalSetting(rc.object)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot get original setting for deployment %v: %s", klog.KObj(rc.object), err.Error()))
}
control.InitOriginalSetting(&setting, rc.object)
patchData := patch.NewDeploymentPatch()
patchData.InsertAnnotation(v1beta1.OriginalDeploymentStrategyAnnotation, util.DumpJSON(&setting))
patchData.InsertAnnotation(util.BatchReleaseControlAnnotation, util.DumpJSON(metav1.NewControllerRef(
release, release.GetObjectKind().GroupVersionKind())))

patchData.UpdateStrategy(apps.DeploymentStrategy{
Type: apps.RollingUpdateDeploymentStrategyType,
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0},
},
})
patchData.UpdateMinReadySeconds(v1beta1.MaxReadySeconds)
patchData.UpdateProgressDeadlineSeconds(utilpointer.Int32(v1beta1.MaxProgressSeconds))

// Apply the patch to the Deployment
if err := rc.client.Patch(context.TODO(), util.GetEmptyObjectWithKey(rc.object), patchData); err != nil {
return fmt.Errorf("failed to patch deployment %v: %v", klog.KObj(rc.object), err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ func TestRealController(t *testing.T) {

release := releaseDemo.DeepCopy()
clone := deploymentDemo.DeepCopy()
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(release, clone).Build()
stableRs, canaryRs := makeStableReplicaSets(clone), makeCanaryReplicaSets(clone)
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(release, clone, stableRs, canaryRs).Build()
// build new controller
c := NewController(cli, deploymentKey, clone.GroupVersionKind()).(*realController)
controller, err := c.BuildController()
Expand All @@ -414,6 +415,10 @@ func TestRealController(t *testing.T) {
MinReadySeconds: 0,
ProgressDeadlineSeconds: pointer.Int32(600),
})))
// check minReadyseconds field of stable replicaset
fetchRS := &apps.ReplicaSet{}
Expect(cli.Get(context.TODO(), types.NamespacedName{Name: stableRs.GetName(), Namespace: stableRs.GetNamespace()}, fetchRS)).NotTo(HaveOccurred())
Expect(fetchRS.Spec.MinReadySeconds).Should(Equal(int32(v1beta1.MaxReadySeconds)))

c.object = fetch // mock

Expand Down
16 changes: 15 additions & 1 deletion pkg/controller/rollout/rollout_progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/openkruise/rollouts/api/v1beta1"
"github.com/openkruise/rollouts/pkg/trafficrouting"
"github.com/openkruise/rollouts/pkg/util"
utilerrors "github.com/openkruise/rollouts/pkg/util/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -110,7 +111,12 @@ func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *v1beta1.Rollout
case v1alpha1.ProgressingReasonInRolling:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
err = r.doProgressingInRolling(rolloutContext)
if err != nil {
if utilerrors.IsFatal(err) {
// For fatal errors, do not retry as it wastes resources and has no effect.
// therefore, we don't propagate the error, but just log it.
// user should do sth instead, eg. for bluegreen continuous release scenario, user should do rollback
klog.Warningf("rollout(%s/%s) doProgressingInRolling error(%s)", rollout.Namespace, rollout.Name, err.Error())
} else if err != nil {
return nil, err
}

Expand Down Expand Up @@ -230,6 +236,14 @@ func (r *RolloutReconciler) handleContinuousRelease(c *RolloutContext) error {
klog.Infof("rollout(%s/%s) workload continuous publishing canaryRevision from(%s) -> to(%s), then restart publishing",
c.Rollout.Namespace, c.Rollout.Name, c.NewStatus.GetCanaryRevision(), c.Workload.CanaryRevision)

// do nothing for blue-green release
if c.Rollout.Spec.Strategy.IsBlueGreenRelease() {
cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing)
cond.Message = "[warning] new version released in progress of blue-green release, please rollback first"
c.NewStatus.Message = cond.Message
return utilerrors.NewFatalError(fmt.Errorf("cannot do continuous release for blue-green release, rollback firstly"))
}

done, err := r.doProgressingReset(c)
if err != nil {
klog.Errorf("rollout(%s/%s) doProgressingReset failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
Expand Down
7 changes: 1 addition & 6 deletions pkg/util/client/delegating_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,7 @@ func (d *delegatingReader) List(ctx context.Context, list client.ObjectList, opt
return d.CacheReader.List(ctx, list, opts...)
}

var DisableDeepCopy = disableDeepCopy{}

type disableDeepCopy struct{}

func (_ disableDeepCopy) ApplyToList(_ *client.ListOptions) {
}
var DisableDeepCopy = client.UnsafeDisableDeepCopy

func isDisableDeepCopy(opts []client.ListOption) bool {
for _, opt := range opts {
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/controller_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1beta1.O
return &Workload{IsStatusConsistent: false}, nil
}
// stable replicaSet
stableRs, err := r.getDeploymentStableRs(stable)
stableRs, err := r.GetDeploymentStableRs(stable)
if err != nil || stableRs == nil {
return &Workload{IsStatusConsistent: false}, err
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1beta1.O
if err != nil || canary == nil {
return workload, err
}
canaryRs, err := r.getDeploymentStableRs(canary)
canaryRs, err := r.GetDeploymentStableRs(canary)
if err != nil || canaryRs == nil {
return workload, err
}
Expand Down Expand Up @@ -422,7 +422,7 @@ func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([]
return rss, nil
}

func (r *ControllerFinder) getDeploymentStableRs(obj *apps.Deployment) (*apps.ReplicaSet, error) {
func (r *ControllerFinder) GetDeploymentStableRs(obj *apps.Deployment) (*apps.ReplicaSet, error) {
rss, err := r.GetReplicaSetsForDeployment(obj)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/webhook/util/writer/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func prepareToWrite(dir string) error {
// TODO: figure out if we can reduce the permission. (Now it's 0777)
err = os.MkdirAll(dir, 0777)
if err != nil {
return fmt.Errorf("can't create dir: %v", dir)
return fmt.Errorf("can't create dir: %v, err: %s", dir, err.Error())
}
case err != nil:
return err
Expand Down
Loading

0 comments on commit b8bd5b5

Please sign in to comment.