Skip to content

Commit

Permalink
rolling deployment in partition-style
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <[email protected]>
  • Loading branch information
mingzhou.swx committed Feb 9, 2023
1 parent 843e8b8 commit 2a52b0c
Show file tree
Hide file tree
Showing 45 changed files with 2,087 additions and 139 deletions.
20 changes: 19 additions & 1 deletion .github/workflows/e2e-advanced-deployment-1.19.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
echo "Timeout to wait for kruise-rollout ready"
exit 1
fi
- name: Run E2E Tests
- name: Run E2E Tests For Deployment Controller
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
Expand All @@ -108,3 +108,21 @@ jobs:
exit 1
fi
exit $retVal
- name: Run E2E Tests For Control Plane
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment canary rollout with Ingress' test/e2e
retVal=$?
# kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout
restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-rollout has not restarted"
else
kubectl get pod -n kruise-rollout --no-headers
echo "Kruise-rollout has restarted, abort!!!"
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal
20 changes: 19 additions & 1 deletion .github/workflows/e2e-advanced-deployment-1.23.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
echo "Timeout to wait for kruise-rollout ready"
exit 1
fi
- name: Run E2E Tests
- name: Run E2E Tests For Deployment Controller
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
Expand All @@ -107,4 +107,22 @@ jobs:
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal
- name: Run E2E Tests For Control Plane
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment canary rollout with Ingress' test/e2e
retVal=$?
# kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout
restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-rollout has not restarted"
else
kubectl get pod -n kruise-rollout --no-headers
echo "Kruise-rollout has restarted, abort!!!"
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal
4 changes: 4 additions & 0 deletions api/v1alpha1/deployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ const (
// DeploymentExtraStatusAnnotation is annotation for deployment,
// which is extra status field of Advanced Deployment.
DeploymentExtraStatusAnnotation = "rollouts.kruise.io/deployment-extra-status"

// DeploymentStableRevisionLabel is label for deployment,
// which record the stable revision during the current rolling process.
DeploymentStableRevisionLabel = "rollouts.kruise.io/stable-revision"
)

// DeploymentStrategy is strategy field for Advanced Deployment
Expand Down
13 changes: 8 additions & 5 deletions api/v1alpha1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ const (
// RollbackInBatchAnnotation allow use disable quick rollback, and will roll back in batch style.
RollbackInBatchAnnotation = "rollouts.kruise.io/rollback-in-batch"

// DeploymentRolloutStyleAnnotation define the rolling behavior for Deployment.
// RolloutStyleAnnotation define the rolling behavior for Deployment.
// must be "partition" or "canary":
// * "partition" means rolling Deployment in batches just like CloneSet, and will NOT create any extra Deployment;
// * "canary" means rolling in canary way, and will create a canary Deployment.
// Defaults to canary
DeploymentRolloutStyleAnnotation = "rollouts.kruise.io/deployment-rolling-style"
// * "partition" means rolling in batches just like CloneSet, and will NOT create any extra Workload;
// * "canary" means rolling in canary way, and will create a canary Workload.
// Currently, only Deployment support both "partition" and "canary" rolling styles.
// For other workload types, they only support "partition" styles.
// Defaults to "canary" to Deployment.
// Defaults to "partition" to the others.
RolloutStyleAnnotation = "rollouts.kruise.io/rolling-style"
)

// RolloutSpec defines the desired state of Rollout
Expand Down
5 changes: 4 additions & 1 deletion config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ spec:
- "--metrics-bind-address=127.0.0.1:8080"
- "--leader-elect"
- "--feature-gates=AdvancedDeployment=true"
- "--v=3"
- "--v=5"
env:
- name: KUBE_CACHE_MUTATION_DETECTOR
value: "true"
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
utilfeature "github.com/openkruise/rollouts/pkg/util/feature"
"github.com/openkruise/rollouts/pkg/webhook"
"github.com/spf13/pflag"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -57,6 +58,7 @@ func init() {
utilruntime.Must(kruisev1beta1.AddToScheme(scheme))
utilruntime.Must(rolloutsv1alpha1.AddToScheme(scheme))
utilruntime.Must(gatewayv1alpha2.AddToScheme(scheme))
utilruntime.Must(admissionregistrationv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

Expand Down
25 changes: 20 additions & 5 deletions pkg/controller/batchrelease/batchrelease_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package batchrelease
import (
"fmt"
"reflect"
"strings"
"time"

appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/control"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/deployment"
canarydeployment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/deployment"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/cloneset"
partitiondeployment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/deployment"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/statefulset"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -100,6 +102,8 @@ func (r *Executor) executeBatchReleasePlan(release *v1alpha1.BatchRelease, newSt
case err == nil:
newStatus.Phase = v1alpha1.RolloutPhaseProgressing
result = reconcile.Result{RequeueAfter: DefaultDuration}
default:
klog.Warningf("Failed to initialize %v, err %v", klog.KObj(release), err)
}

case v1alpha1.RolloutPhaseProgressing:
Expand All @@ -111,6 +115,8 @@ func (r *Executor) executeBatchReleasePlan(release *v1alpha1.BatchRelease, newSt
switch {
case err == nil:
newStatus.Phase = v1alpha1.RolloutPhaseCompleted
default:
klog.Warningf("Failed to finalize %v, err %v", klog.KObj(release), err)
}

case v1alpha1.RolloutPhaseCompleted:
Expand Down Expand Up @@ -140,6 +146,8 @@ func (r *Executor) progressBatches(release *v1alpha1.BatchRelease, newStatus *v1
case err == nil:
result = reconcile.Result{RequeueAfter: DefaultDuration}
newStatus.CanaryStatus.CurrentBatchState = v1alpha1.VerifyingBatchState
default:
klog.Warningf("Failed to upgrade %v, err %v", klog.KObj(release), err)
}

case v1alpha1.VerifyingBatchState:
Expand All @@ -149,6 +157,7 @@ func (r *Executor) progressBatches(release *v1alpha1.BatchRelease, newStatus *v1
case err != nil:
// should go to upgrade state to do again to avoid dead wait.
newStatus.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState
klog.Warningf("%v current batch is not ready, err %v", klog.KObj(release), err)
default:
now := metav1.Now()
newStatus.CanaryStatus.BatchReadyTime = &now
Expand All @@ -164,6 +173,7 @@ func (r *Executor) progressBatches(release *v1alpha1.BatchRelease, newStatus *v1
// if the batch ready condition changed due to some reasons, just recalculate the current batch.
newStatus.CanaryStatus.BatchReadyTime = nil
newStatus.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState
klog.Warningf("%v current batch is not ready, err %v", klog.KObj(release), err)
case !isPartitioned(release):
r.moveToNextBatch(release, newStatus)
result = reconcile.Result{RequeueAfter: DefaultDuration}
Expand Down Expand Up @@ -195,19 +205,24 @@ func (r *Executor) getReleaseController(release *v1alpha1.BatchRelease, newStatu
switch targetRef.APIVersion {
case appsv1alpha1.GroupVersion.String():
if targetRef.Kind == reflect.TypeOf(appsv1alpha1.CloneSet{}).Name() {
klog.InfoS("Using CloneSet batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
klog.InfoS("Using CloneSet partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return partitionstyle.NewControlPlane(cloneset.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil
}

case apps.SchemeGroupVersion.String():
if targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() {
klog.InfoS("Using Deployment batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return canarystyle.NewControlPlane(deployment.NewController, r.client, r.recorder, release, newStatus, targetKey), nil
if strings.EqualFold(release.Annotations[v1alpha1.RolloutStyleAnnotation], string(v1alpha1.PartitionRollingStyle)) {
klog.InfoS("Using Deployment partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return partitionstyle.NewControlPlane(partitiondeployment.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil
} else {
klog.InfoS("Using Deployment canary-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return canarystyle.NewControlPlane(canarydeployment.NewController, r.client, r.recorder, release, newStatus, targetKey), nil
}
}
}

// try to use StatefulSet-like rollout controller by default
klog.InfoS("Using StatefulSet-like batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
klog.InfoS("Using StatefulSet-Like partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return partitionstyle.NewControlPlane(statefulset.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/batchrelease/batchrelease_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *Executor) syncStatusBeforeExecuting(release *v1alpha1.BatchRelease, new
// (3). Plan is changed during rollout
// (4). Plan status is unexpected/unhealthy
case isPlanCompleted(release):
message = "release plan has been terminated, will do nothing"
message = "release plan has been completed, will do nothing"
needStopThisRound = true

case isPlanFinalizing(release):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (rc *realCanaryController) UpgradeBatch() error {
}

batchContext := rc.CalculateBatchContext(rc.release)
klog.Infof("BatchRelease %v upgrade batch: %s", klog.KObj(rc.release), batchContext.Log())
klog.Infof("BatchRelease %v calculated context when upgrade batch: %s",
klog.KObj(rc.release), batchContext.Log())

return canary.UpgradeBatch(batchContext)
}
Expand All @@ -129,7 +130,8 @@ func (rc *realCanaryController) CheckBatchReady() error {
}

batchContext := rc.CalculateBatchContext(rc.release)
klog.Infof("BatchRelease %v check batch: %s", klog.KObj(rc.release), batchContext.Log())
klog.Infof("BatchRelease %v calculated context when check batch ready: %s",
klog.KObj(rc.release), batchContext.Log())

return batchContext.IsBatchReady()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ func (r *realCanaryController) UpgradeBatch(ctx *batchcontext.BatchContext) erro
}

body := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desired)
if err := r.canaryClient.Patch(context.TODO(), deployment, client.RawPatch(types.MergePatchType, []byte(body))); err != nil {
return err
}
klog.Infof("Successfully submit rolling replicas %d to Deployment %v", desired, klog.KObj(deployment))
return nil
return r.canaryClient.Patch(context.TODO(), deployment, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}

func (r *realCanaryController) Create(release *v1alpha1.BatchRelease) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -53,37 +52,27 @@ func (rc *realStableController) Initialize(release *v1alpha1.BatchRelease) error
owner := control.BuildReleaseControlInfo(release)

body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, util.BatchReleaseControlAnnotation, owner)
if err := rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.MergePatchType, []byte(body))); err != nil {
return err
}
klog.Infof("Successfully claim Deployment %v", klog.KObj(rc.stableObject))
return nil
return rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}

func (rc *realStableController) Finalize(release *v1alpha1.BatchRelease) (err error) {
func (rc *realStableController) Finalize(release *v1alpha1.BatchRelease) error {
if rc.stableObject == nil {
return nil // no need to process deleted object
}

defer func() {
if err == nil {
klog.Infof("Successfully finalize Deployment %v", klog.KObj(rc.stableObject))
}
}()

// if batchPartition == nil, workload should be promoted;
pause := release.Spec.ReleasePlan.BatchPartition != nil
body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}},"spec":{"paused":%v}}`,
util.BatchReleaseControlAnnotation, pause)

d := util.GetEmptyObjectWithKey(rc.stableObject)
if err = rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.MergePatchType, []byte(body))); err != nil {
return
if err := rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
return err
}
if control.ShouldWaitResume(release) {
err = waitAllUpdatedAndReady(d.(*apps.Deployment))
return waitAllUpdatedAndReady(d.(*apps.Deployment))
}
return
return nil
}

func waitAllUpdatedAndReady(deployment *apps.Deployment) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -50,7 +49,7 @@ func NewController(cli client.Client, key types.NamespacedName, _ schema.GroupVe
}
}

func (rc *realController) GetInfo() *util.WorkloadInfo {
func (rc *realController) GetWorkloadInfo() *util.WorkloadInfo {
return rc.WorkloadInfo
}

Expand Down Expand Up @@ -85,11 +84,7 @@ func (rc *realController) Initialize(release *v1alpha1.BatchRelease) error {
owner := control.BuildReleaseControlInfo(release)
body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}},"spec":{"updateStrategy":{"paused":%v,"partition":"%s"}}}`,
util.BatchReleaseControlAnnotation, owner, false, "100%")
if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil {
return err
}
klog.Infof("Successfully initialized CloneSet %v", klog.KObj(clone))
return nil
return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body)))
}

func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
Expand All @@ -112,11 +107,7 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error {
}

clone := util.GetEmptyObjectWithKey(rc.object)
if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil {
return err
}
klog.Infof("Successfully submit partition %v for CloneSet %v", ctx.DesiredPartition, klog.KObj(clone))
return nil
return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body)))
}

func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error {
Expand All @@ -133,11 +124,7 @@ func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error {
body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}%s}`, util.BatchReleaseControlAnnotation, specBody)

clone := util.GetEmptyObjectWithKey(rc.object)
if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil {
return err
}
klog.Infof("Successfully finalize StatefulSet %v", klog.KObj(rc.object))
return nil
return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body)))
}

func (rc *realController) CalculateBatchContext(release *v1alpha1.BatchRelease) (*batchcontext.BatchContext, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func TestRealController(t *testing.T) {
Expect(cli.Get(context.TODO(), cloneKey, fetch)).NotTo(HaveOccurred())
Expect(fetch.Annotations[util.BatchReleaseControlAnnotation]).Should(Equal(""))

stableInfo := controller.GetInfo()
stableInfo := controller.GetWorkloadInfo()
Expect(stableInfo).ShouldNot(BeNil())
checkWorkloadInfo(stableInfo, clone)
}
Expand Down
Loading

0 comments on commit 2a52b0c

Please sign in to comment.