Skip to content

Commit

Permalink
Add Traffic Mirroring for Istio Service Mesh
Browse files Browse the repository at this point in the history
Traffic mirroring is a pre-stage for canary deployments.  When mirroring
is enabled, at the beginning of a canary deployment traffic is mirrored
to the canary instead of shifted for one canary period.  The service
mesh should mirror by copying the request and sending one copy to the
primary and one copy to the canary; only the response from the primary
is sent to the user.  The response from the canary is only used for
collecting metrics.

Once the mirror period is over, the canary proceeds as usual, shifting
traffic from primary to canary until complete.

Added TestScheduler_Mirroring unit test.
  • Loading branch information
Andrew Jenkins committed Sep 24, 2019
1 parent 655df36 commit 3fa892c
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/apis/flagger/v1alpha3/types.go
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type CanaryAnalysis struct {
Interval string `json:"interval"`
Threshold int `json:"threshold"`
MaxWeight int `json:"maxWeight"`
Mirror bool `json:"mirror,omitempty"`
StepWeight int `json:"stepWeight"`
Metrics []CanaryMetric `json:"metrics"`
Webhooks []CanaryWebhook `json:"webhooks,omitempty"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ func newTestCanary() *v1alpha3.Canary {
return cd
}

func newTestCanaryMirror() *v1alpha3.Canary {
cd := newTestCanary()
cd.Spec.CanaryAnalysis.Mirror = true
return cd
}

func newTestCanaryAB() *v1alpha3.Canary {
cd := &v1alpha3.Canary{
TypeMeta: metav1.TypeMeta{APIVersion: v1alpha3.SchemeGroupVersion.String()},
Expand Down
39 changes: 29 additions & 10 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh
}

// check if the canary success rate is above the threshold
// skip check if no traffic is routed to canary
if canaryWeight == 0 && cd.Status.Iterations == 0 {
// skip check if no traffic is routed or mirrored to canary
if canaryWeight == 0 && cd.Status.Iterations == 0 &&
(cd.Spec.CanaryAnalysis.Mirror == false || mirrored == false) {
c.recordEventInfof(cd, "Starting canary analysis for %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)

// run pre-rollout web hooks
Expand Down Expand Up @@ -429,16 +430,34 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh
if cd.Spec.CanaryAnalysis.StepWeight > 0 {
// increase traffic weight
if canaryWeight < maxWeight {
primaryWeight -= cd.Spec.CanaryAnalysis.StepWeight
if primaryWeight < 0 {
primaryWeight = 0
}
canaryWeight += cd.Spec.CanaryAnalysis.StepWeight
if canaryWeight > 100 {
canaryWeight = 100
// If in "mirror" mode, do one step of mirroring before shifting traffic to canary.
// When mirroring, all requests go to primary and canary, but only responses from
// primary go back to the user.
if cd.Spec.CanaryAnalysis.Mirror && canaryWeight == 0 {
if mirrored == false {
mirrored = true
primaryWeight = 100
canaryWeight = 0
} else {
mirrored = false
primaryWeight = 100 - cd.Spec.CanaryAnalysis.StepWeight
canaryWeight = cd.Spec.CanaryAnalysis.StepWeight
}
c.logger.With("canary", fmt.Sprintf("%s.%s", name, namespace)).
Infof("Running mirror step %d/%d/%t", primaryWeight, canaryWeight, mirrored)
} else {

primaryWeight -= cd.Spec.CanaryAnalysis.StepWeight
if primaryWeight < 0 {
primaryWeight = 0
}
canaryWeight += cd.Spec.CanaryAnalysis.StepWeight
if canaryWeight > 100 {
canaryWeight = 100
}
}

if err := meshRouter.SetRoutes(cd, primaryWeight, canaryWeight); err != nil {
if err := meshRouter.SetRoutes(cd, primaryWeight, canaryWeight, mirrored); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,64 @@ func TestScheduler_Promotion(t *testing.T) {
}
}

func TestScheduler_Mirroring(t *testing.T) {
mocks := SetupMocks(newTestCanaryMirror())
// init
mocks.ctrl.advanceCanary("podinfo", "default", true)

// update
dep2 := newTestDeploymentV2()
_, err := mocks.kubeClient.AppsV1().Deployments("default").Update(dep2)
if err != nil {
t.Fatal(err.Error())
}

// detect pod spec changes
mocks.ctrl.advanceCanary("podinfo", "default", true)

// advance
mocks.ctrl.advanceCanary("podinfo", "default", true)

// check if traffic is mirrored to canary
primaryWeight, canaryWeight, mirrored, err := mocks.router.GetRoutes(mocks.canary)
if err != nil {
t.Fatal(err.Error())
}

if primaryWeight != 100 {
t.Errorf("Got primary route %v wanted %v", primaryWeight, 100)
}

if canaryWeight != 0 {
t.Errorf("Got canary route %v wanted %v", canaryWeight, 0)
}

if mirrored != true {
t.Errorf("Got mirrored %v wanted %v", mirrored, true)
}

// advance
mocks.ctrl.advanceCanary("podinfo", "default", true)

// check if traffic is mirrored to canary
primaryWeight, canaryWeight, mirrored, err = mocks.router.GetRoutes(mocks.canary)
if err != nil {
t.Fatal(err.Error())
}

if primaryWeight != 90 {
t.Errorf("Got primary route %v wanted %v", primaryWeight, 90)
}

if canaryWeight != 10 {
t.Errorf("Got canary route %v wanted %v", canaryWeight, 10)
}

if mirrored != false {
t.Errorf("Got mirrored %v wanted %v", mirrored, false)
}
}

func TestScheduler_ABTesting(t *testing.T) {
mocks := SetupMocks(newTestCanaryAB())
// init
Expand Down

0 comments on commit 3fa892c

Please sign in to comment.