Skip to content

Commit

Permalink
imporve traffic strategy for canary and partition release
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <[email protected]>
  • Loading branch information
Funinu committed Jun 7, 2024
1 parent cddc53a commit d32837e
Show file tree
Hide file tree
Showing 9 changed files with 541 additions and 160 deletions.
19 changes: 19 additions & 0 deletions api/v1beta1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.
package v1beta1

import (
"reflect"

apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -92,6 +95,22 @@ func (r *RolloutStrategy) GetRollingStyle() RollingStyleType {
return PartitionRollingStyle
}

// simply using EnableExtraWorkloadForCanary is not enough, for example, a v1alaph1 Rollout
// can be converted to v1beta1 Rollout with EnableExtraWorkloadForCanary set as true, even the
// objectRef is cloneset (which doesn't support canary release)
func IsRealPartition(rollout *Rollout) bool {
estimation := rollout.Spec.Strategy.GetRollingStyle()
if estimation == EmptyRollingStyle || estimation == BlueGreenRollingStyle {
return false
}
targetRef := rollout.Spec.WorkloadRef
if targetRef.APIVersion == apps.SchemeGroupVersion.String() && targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() &&
estimation == CanaryRollingStyle {
return false
}
return true
}

// r.GetRollingStyle() == BlueGreenRollingStyle
func (r *RolloutStrategy) IsBlueGreenRelease() bool {
return r.GetRollingStyle() == BlueGreenRollingStyle
Expand Down
161 changes: 130 additions & 31 deletions pkg/controller/rollout/rollout_canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
if currentStep.Traffic == nil && len(currentStep.Matches) == 0 {
tr := newTrafficRoutingContext(c)
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil {
return err
Expand All @@ -89,10 +89,59 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
switch canaryStatus.CurrentStepState {
// before CanaryStepStateUpgrade, handle some special cases, to prevent traffic loss
case v1beta1.CanaryStepStateInit:
// placeholder for the later traffic modification Pull Request
canaryStatus.NextStepIndex = util.NextBatchIndex(c.Rollout, canaryStatus.CurrentStepIndex)
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateInit)
tr := newTrafficRoutingContext(c)
if currentStep.Traffic == nil && len(currentStep.Matches) == 0 {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)
return nil
}

/*
The next check is used to bypass the bug in ingress-nginx controller https://github.com/kubernetes/ingress-nginx/issues/9635
for partition release, if the currentStep replicas is "100%", we can assume that all traffic should be routed to canary pods
*/
if currentStep.Replicas.StrVal == "100%" && v1beta1.IsRealPartition(c.Rollout) {
klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.RestoreStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

/*
The next check is used to solve the following scenario:
steps:
- replicas: 1 # frist batch
matches:
- headers:
- name: user-agent
type: Exact
value: pc
we should patch selector to stable Service before CanaryStepStateUpgrade when in the first batch
otherwise, some traffic will loss between CanaryStepStateUpgrade and CanaryStepStateTrafficRouting
*/
if canaryStatus.CurrentStepIndex == 1 {
klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.PatchStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
fallthrough
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)

case v1beta1.CanaryStepStateUpgrade:
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateUpgrade)
Expand All @@ -101,6 +150,9 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
return err
} else if done {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
if currentStep.Replicas.StrVal == "100%" && v1beta1.IsRealPartition(c.Rollout) {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateMetricsAnalysis
}
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState)
Expand Down Expand Up @@ -216,6 +268,12 @@ func (m *canaryReleaseManager) doCanaryPaused(c *RolloutContext) (bool, error) {
canaryStatus := c.NewStatus.CanaryStatus
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
steps := len(c.Rollout.Spec.Strategy.Canary.Steps)
// If it is the last step, and 100% of pods, then return true
if int32(steps) == canaryStatus.CurrentStepIndex {
if currentStep.Replicas != nil && currentStep.Replicas.StrVal == "100%" {
return true, nil
}
}
cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing)
// need manual confirmation
if currentStep.Pause.Duration == nil {
Expand Down Expand Up @@ -268,8 +326,9 @@ func (m *canaryReleaseManager) doCanaryJump(c *RolloutContext) (jumped bool) {

// cleanup after rollout is completed or finished
func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, error) {
canaryStatus := c.NewStatus.CanaryStatus
// when CanaryStatus is nil, which means canary action hasn't started yet, don't need doing cleanup
if c.NewStatus.CanaryStatus == nil {
if canaryStatus == nil {
return true, nil
}
// 1. rollout progressing complete, remove rollout progressing annotation in workload
Expand All @@ -278,33 +337,73 @@ func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, erro
return false, err
}
tr := newTrafficRoutingContext(c)
// 2. remove stable service the pod revision selector, so stable service will be selector all version pods.
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr, true)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err
}
// 3. set workload.pause=false; set workload.partition=0
done, err = m.finalizingBatchRelease(c)
if err != nil || !done {
return done, err
}
// 4. modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods.
done, err = m.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err
}
// 5. delete batchRelease crd
done, err = m.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) Finalize batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
klog.Infof("rollout(%s/%s) Finalising Step is %s", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
switch canaryStatus.FinalisingStep {
default:
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeStableService
fallthrough

case v1beta1.FinalisingStepTypeStableService:
// restore stable service selector to select all pods [with grace time]
done, err := m.trafficRoutingManager.RestoreStableService(tr)
if err != nil || !done {
canaryStatus.LastUpdateTime = tr.LastUpdateTime
return done, err
}
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeGateway

case v1beta1.FinalisingStepTypeGateway:
// modify network api(ingress or gateway api) configuration
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr)
if err != nil || !done {
canaryStatus.LastUpdateTime = tr.LastUpdateTime
return done, err
}
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeCanaryService

/*
//TODO - As mentioned in FinalisingTrafficRouting function,
we should wait grace time between FinalisingStepTypeGateway and FinalisingStepTypeCanaryService
to avoid a very rare case which could cause minor traffic loss (espically, Istio), but it's difficult
to implement now.
However, we still reserve the FinalisingStepTypeCanaryService step here, but instead of removing the
canary Service as expected (which has been done in FinalisingStepTypeGateway), FinalisingStepTypeCanaryService
simply wait a gracetime between FinalisingStepTypeCanaryService and FinalisingStepTypeDeleteBR now
*/
case v1beta1.FinalisingStepTypeCanaryService:
// wait a gracetime for safety
if canaryStatus.LastUpdateTime != nil {
if verifyTime := canaryStatus.LastUpdateTime.Add(time.Second * time.Duration(3)); verifyTime.After(time.Now()) {
klog.Infof("restoring network configuration, but we need to wait %d seconds", 3)
return false, nil
}
}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeBatchRelease

case v1beta1.FinalisingStepTypeBatchRelease:
// set workload.pause=false; set workload.partition=0
done, err := m.finalizingBatchRelease(c)
if err != nil || !done {
return done, err
}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR

case v1beta1.FinalisingStepTypeDeleteBR:
// delete batchRelease crd
done, err := m.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) Finalize batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
klog.Infof("rollout(%s/%s) doCanaryFinalising success", c.Rollout.Namespace, c.Rollout.Name)
return true, nil
}
klog.Infof("rollout(%s/%s) doCanaryFinalising success", c.Rollout.Namespace, c.Rollout.Name)
return true, nil

return false, nil
}

func (m *canaryReleaseManager) removeRolloutProgressingAnnotation(c *RolloutContext) error {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/rollout/rollout_canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand All @@ -76,6 +77,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.StableRevision = "pod-template-hash-v1"
s.CanaryStatus.CanaryRevision = "6f8cc56547"
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -138,6 +140,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -183,6 +186,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.CanaryReplicas = 1
s.CanaryStatus.CanaryReadyReplicas = 1
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -287,6 +291,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 3
obj.Status.CanaryStatus.NextStepIndex = 4
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand All @@ -298,6 +303,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.CanaryStatus.CurrentStepIndex = 3
obj.CanaryStatus.NextStepIndex = 4
obj.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand Down
26 changes: 22 additions & 4 deletions pkg/controller/rollout/rollout_progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type RolloutContext struct {
RecheckTime *time.Time
// wait stable workload pods ready
WaitReady bool
// finalising reason
FinalizeReason string
}

// parameter1 retryReconcile, parameter2 error
Expand Down Expand Up @@ -122,6 +124,7 @@ func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *v1beta1.Rollout
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
var done bool
rolloutContext.WaitReady = true
rolloutContext.FinalizeReason = v1beta1.FinaliseReasonSuccess
done, err = r.doFinalising(rolloutContext)
if err != nil {
return nil, err
Expand All @@ -146,6 +149,7 @@ func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *v1beta1.Rollout
case v1alpha1.ProgressingReasonCancelling:
klog.Infof("rollout(%s/%s) is Progressing, and in reason(%s)", rollout.Namespace, rollout.Name, cond.Reason)
var done bool
rolloutContext.FinalizeReason = v1beta1.FinaliseReasonRollback
done, err = r.doFinalising(rolloutContext)
if err != nil {
return nil, err
Expand Down Expand Up @@ -411,22 +415,36 @@ func isRollingBackInBatches(rollout *v1beta1.Rollout, workload *util.Workload) b
// 1. modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
// 2. remove batchRelease CR.
func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error) {
if len(c.Rollout.Spec.Strategy.Canary.TrafficRoutings) > 0 {
if c.Rollout.Spec.Strategy.HasTrafficRoutings() {
// modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
tr := newTrafficRoutingContext(c)
done, err := r.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
done, err := r.trafficRoutingManager.RouteAllTrafficToCanaryORStable(tr, v1beta1.FinaliseReasonContinuous)
if err != nil || !done {
c.NewStatus.GetSubStatus().LastUpdateTime = tr.LastUpdateTime
return done, err
}
}
done, err := r.canaryManager.removeBatchRelease(c)

releaseManager, err := r.getReleaseManager(c.Rollout)
if err != nil {
return false, err
}
done, err := releaseManager.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
// restore Service and network api
if c.Rollout.Spec.Strategy.HasTrafficRoutings() {
tr := newTrafficRoutingContext(c)
done, err = r.trafficRoutingManager.FinalisingTrafficRouting(tr)
c.NewStatus.GetSubStatus().LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err
}
}
return true, nil
}

Expand Down
Loading

0 comments on commit d32837e

Please sign in to comment.