Skip to content

Commit

Permalink
traffic: Refactor continous logic
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <[email protected]>
  • Loading branch information
Funinu committed Jul 8, 2024
1 parent db761a9 commit 073db89
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 23 deletions.
2 changes: 1 addition & 1 deletion api/v1beta1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ const (
// Restore the GatewayAPI/Ingress/Istio
FinalisingStepTypeGateway FinalisingStepType = "RestoreGateway"
// Delete Canary Service
FinalisingStepTypeCanaryService FinalisingStepType = "DeleteCanayService"
FinalisingStepTypeDeleteCanaryService FinalisingStepType = "DeleteCanaryService"
// Delete Batch Release
FinalisingStepTypeDeleteBR FinalisingStepType = "DeleteBatchRelease"
)
Expand Down
98 changes: 84 additions & 14 deletions pkg/controller/rollout/rollout_progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,93 @@ func isRollingBackInBatches(rollout *v1beta1.Rollout, workload *util.Workload) b
// 1. modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
// 2. remove batchRelease CR.
func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error) {
if len(c.Rollout.Spec.Strategy.Canary.TrafficRoutings) > 0 {
// modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
tr := newTrafficRoutingContext(c)
done, err := r.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err
}
}
done, err := r.canaryManager.removeBatchRelease(c)
releaseManager, err := r.getReleaseManager(c.Rollout)
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
return true, nil
// if no trafficRouting exists, simply remove batchRelease
if !c.Rollout.Spec.Strategy.HasTrafficRoutings() {
done, err := releaseManager.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
return true, nil
}
// else, do reset logic under next sequence:
// restore the gateway -> remove the batchRelease -> remove the canary Service
subStatus := c.NewStatus.GetSubStatus()
if subStatus == nil {
return true, nil
}
// To ensure respect for graceful time between these steps, we set start time at the beginning
if len(subStatus.FinalisingStep) == 0 {
subStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
}
tr := newTrafficRoutingContext(c)
klog.Infof("rollout(%s/%s) Finalising Step is %s", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep)
switch subStatus.FinalisingStep {
default:
// start from FinalisingStepTypeGateway
subStatus.FinalisingStep = v1beta1.FinalisingStepTypeGateway
fallthrough
// firstly, restore the gateway resources (ingress/gatewayAPI/Istio), that means
// only stable Service will accept the traffic
case v1beta1.FinalisingStepTypeGateway:
err := r.trafficRoutingManager.RestoreGateway(tr)
if err != nil {
subStatus.LastUpdateTime = tr.LastUpdateTime
return false, err
}
// usually, GracePeriodSeconds means duration to wait after an operation done
// we use defaultGracePeriodSeconds+1 because the count start from the time before the RestoreGateway called
if subStatus.LastUpdateTime != nil && time.Since(subStatus.LastUpdateTime.Time) < time.Second*time.Duration(defaultGracePeriodSeconds+1) {
klog.Infof("rollout(%s/%s) in step (%s), and wait %d seconds", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep, defaultGracePeriodSeconds+1)
return false, nil
}
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep)
subStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
subStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR
fallthrough
// secondly, remove the batchRelease. For canary release, it means the immediate deletion of
// canary deployment, for other release, the v2 pods won't be deleted immediately
// in both cases, only the stable pods (v1) accept the traffic
case v1beta1.FinalisingStepTypeDeleteBR:
done, err := releaseManager.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) Finalize batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
if subStatus.LastUpdateTime != nil && time.Since(subStatus.LastUpdateTime.Time) < time.Second*time.Duration(defaultGracePeriodSeconds+1) {
klog.Infof("rollout(%s/%s) in step (%s), and wait %d seconds", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep, defaultGracePeriodSeconds+1)
return false, nil
}
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep)
subStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
subStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService
fallthrough
// finally, remove the canary service. This step can swap with the last step.
/*
NOTE: we only remove the canary service, with stable service unchanged, that means the
stable service may still has the selector of stable pods, which is intended. Consider
this senario: continuous release v1->v2->3, and currently we are in step x, which expects
to route 0% traffic to v2 (or simply A/B test), if we release v3 in step x and remove the
stable service selector, then the traffic will route to both v1 and v2 before executing the
first step of v3 release.
*/
case v1beta1.FinalisingStepTypeDeleteCanaryService:
err := r.trafficRoutingManager.RemoveCanaryService(tr)
if err != nil {
subStatus.LastUpdateTime = tr.LastUpdateTime
return false, err
}
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep)
return true, nil
}
}

func (r *RolloutReconciler) recalculateCanaryStep(c *RolloutContext) (int32, error) {
Expand Down
68 changes: 65 additions & 3 deletions pkg/controller/rollout/rollout_progressing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func TestReconcileRolloutProgressing(t *testing.T) {
},
},
{
name: "ReconcileRolloutProgressing rolling -> continueRelease",
name: "ReconcileRolloutProgressing rolling -> continueRelease1",
getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) {
dep1 := deploymentDemo.DeepCopy()
dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
Expand Down Expand Up @@ -578,10 +578,72 @@ func TestReconcileRolloutProgressing(t *testing.T) {
},
expectStatus: func() *v1beta1.RolloutStatus {
s := rolloutDemo.Status.DeepCopy()
s.CanaryStatus = nil
s.CanaryStatus.ObservedWorkloadGeneration = 2
s.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd"
s.CanaryStatus.StableRevision = "pod-template-hash-v1"
s.CanaryStatus.CanaryRevision = "6f8cc56547"
s.CanaryStatus.CurrentStepIndex = 3
s.CanaryStatus.CanaryReplicas = 5
s.CanaryStatus.CanaryReadyReplicas = 3
s.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeGateway
s.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInitializing
cond.Reason = v1alpha1.ProgressingReasonInRolling
util.SetRolloutCondition(s, *cond)
s.CurrentStepIndex = s.CanaryStatus.CurrentStepIndex
s.CurrentStepState = s.CanaryStatus.CurrentStepState
return s
},
},
{
name: "ReconcileRolloutProgressing rolling -> continueRelease2",
getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) {
dep1 := deploymentDemo.DeepCopy()
dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
dep2 := deploymentDemo.DeepCopy()
dep2.UID = "1ca4d850-9ec3-48bd-84cb-19f2e8cf4180"
dep2.Name = dep1.Name + "-canary"
dep2.Labels[util.CanaryDeploymentLabel] = dep1.Name
rs1 := rsDemo.DeepCopy()
rs2 := rsDemo.DeepCopy()
rs2.Name = "echoserver-canary-2"
rs2.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: dep2.Name,
UID: "1ca4d850-9ec3-48bd-84cb-19f2e8cf4180",
Controller: utilpointer.BoolPtr(true),
},
}
rs2.Labels["pod-template-hash"] = "pod-template-hash-v2"
rs2.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return []*apps.Deployment{dep1, dep2}, []*apps.ReplicaSet{rs1, rs2}
},
getNetwork: func() ([]*corev1.Service, []*netv1.Ingress) {
return []*corev1.Service{demoService.DeepCopy()}, []*netv1.Ingress{demoIngress.DeepCopy()}
},
getRollout: func() (*v1beta1.Rollout, *v1beta1.BatchRelease, *v1alpha1.TrafficRouting) {
obj := rolloutDemo.DeepCopy()
obj.Status.CanaryStatus.ObservedWorkloadGeneration = 2
obj.Status.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd"
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 3
obj.Status.CanaryStatus.CanaryReplicas = 5
obj.Status.CanaryStatus.CanaryReadyReplicas = 3
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
obj.Status.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
util.SetRolloutCondition(&obj.Status, *cond)
return obj, nil, nil
},
expectStatus: func() *v1beta1.RolloutStatus {
s := rolloutDemo.Status.DeepCopy()
s.Clear()
return s
},
},
Expand Down
21 changes: 21 additions & 0 deletions pkg/controller/rollout/rollout_releaseManager.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rollout

import (
"github.com/openkruise/rollouts/api/v1beta1"
)

type ReleaseManager interface {
// execute the core logic of a release step by step
runCanary(c *RolloutContext) error
// check the NextStepIndex field in status, if modifies detected, jump to target step
doCanaryJump(c *RolloutContext) bool
// called when user accomplishes a release / does a rollback, or disables/removes the Rollout Resource
doCanaryFinalising(c *RolloutContext) (bool, error)
// fetch the BatchRelease object
fetchBatchRelease(ns, name string) (*v1beta1.BatchRelease, error)
// remove the BatchRelease object
removeBatchRelease(c *RolloutContext) (bool, error)
}
134 changes: 129 additions & 5 deletions pkg/trafficrouting/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
}

// end-to-end canary deployment scenario(a -> b -> c), if only b or c is released,
//and a is not released in this scenario, then the canary service is not needed.
if !c.OnlyTrafficRouting && !c.DisableGenerateCanaryService {
// and a is not released in this scenario, then the canary service is not needed.
// amend: if c.disableGenerateCanaryService is true, canary service is not needed either
if !(c.OnlyTrafficRouting || c.DisableGenerateCanaryService) {
if c.StableRevision == "" || c.CanaryRevision == "" {
klog.Warningf("%s stableRevision or podTemplateHash can not be empty, and wait a moment", c.Key)
return false, nil
Expand Down Expand Up @@ -260,9 +261,9 @@ func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestore
if err = trController.Finalise(context.TODO()); err != nil {
return false, err
}
// end to end deployment, don't remove the canary service;
// because canary service is stable service
if !c.OnlyTrafficRouting && !c.DisableGenerateCanaryService {
// end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service;
// because canary service is stable service (ie. no external canary service was created at all)
if !(c.OnlyTrafficRouting || c.DisableGenerateCanaryService) {
// remove canary service
err = m.Delete(context.TODO(), cService)
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -274,6 +275,129 @@ func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestore
return true, nil
}

// RestoreGateway restore gateway resources without graceful time
func (m *Manager) RestoreGateway(c *TrafficRoutingContext) error {
if len(c.ObjectRef) == 0 {
return nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName)
if err != nil {
klog.Errorf("%s newTrafficRoutingController failed: %s", c.Key, err.Error())
return err
}
return trController.Finalise(context.TODO())
}

// RemoveCanaryService find and delete canary Service. stable Service won't be modified
func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) error {
if len(c.ObjectRef) == 0 {
return nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
// end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service;
// because canary service is stable service (ie. no external canary service was created at all)
if !(c.OnlyTrafficRouting || c.DisableGenerateCanaryService) {
// remove canary service
err := m.Delete(context.TODO(), cService)
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error())
return err
}
klog.Infof("%s remove canary service(%s) success", c.Key, cService.Name)
}

return nil
}

// returning (false, nil) means the update has been submitted, and no error occurred
// but we need to wait graceful time before returning true
func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
if c.OnlyTrafficRouting {
return true, nil
}

//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
// not found, wait a moment, retry
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}

if stableService.Spec.Selector[c.RevisionLabelKey] == c.StableRevision {
if c.LastUpdateTime == nil {
return true, nil
}
if time.Since(c.LastUpdateTime.Time) < time.Second*time.Duration(trafficRouting.GracePeriodSeconds) {
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, trafficRouting.GracePeriodSeconds)
return false, nil
}
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success and complete", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision)
return true, nil
}

// patch stable service to only select the stable pods
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.StableRevision)
if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch stable service(%s) selector failed: %s", c.Key, stableService.Name, err.Error())
return false, err
}
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, trafficRouting.GracePeriodSeconds)
return false, nil
}

// returning (false, nil) means the update has been submitted, and no error occurred
// but we need to wait graceful time before returning true
func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
if errors.IsNotFound(err) {
return true, nil
}
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
return false, err
}
// restore stable Service
verify, err := m.restoreStableService(c)
if err != nil || !verify {
return false, err
}
return true, nil
}

func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, cService string) (network.NetworkProvider, error) {
trafficRouting := con.ObjectRef[0]
if trafficRouting.CustomNetworkRefs != nil {
Expand Down
Loading

0 comments on commit 073db89

Please sign in to comment.