Skip to content

Commit

Permalink
Add Initial Prometheus Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dthomson25 committed Mar 26, 2019
1 parent f8c9c38 commit 6ae22c7
Show file tree
Hide file tree
Showing 14 changed files with 1,396 additions and 46 deletions.
77 changes: 75 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 17 additions & 16 deletions controller/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/controller"

"github.com/argoproj/argo-rollouts/controller/metrics"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/annotations"
"github.com/argoproj/argo-rollouts/utils/conditions"
Expand All @@ -20,73 +21,73 @@ func (c *Controller) rolloutBlueGreen(r *v1alpha1.Rollout, rsList []*appsv1.Repl
logCtx := logutil.WithRollout(r)
newRS, oldRSs, err := c.getAllReplicaSetsAndSyncRevision(r, rsList, true)
if err != nil {
return err
return c.metricsServer.IncError(r, err)
}
previewSvc, activeSvc, err := c.getPreviewAndActiveServices(r)
if err != nil {
return err
return c.metricsServer.IncError(r, err)
}
allRSs := append(oldRSs, newRS)

// Scale up, if we can.
logCtx.Infof("Reconciling new ReplicaSet '%s'", newRS.Name)
scaledUp, err := c.reconcileNewReplicaSet(allRSs, newRS, r)
if err != nil {
return err
return c.metricsServer.IncError(r, err)
}
if scaledUp {
logCtx.Infof("Not finished reconciling new ReplicaSet '%s'", newRS.Name)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false, metrics.Progressing)
}

if previewSvc != nil {
logCtx.Infof("Reconciling preview service '%s'", previewSvc.Name)
switchPreviewSvc, err := c.reconcilePreviewService(r, newRS, previewSvc, activeSvc)
if err != nil {
return err
return c.metricsServer.IncError(r, err)
}
if switchPreviewSvc {
logCtx.Infof("Not finished reconciling preview service' %s'", previewSvc.Name)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, true)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, true, metrics.Progressing)
}
}

logCtx.Info("Reconciling pause before switching active service")
pauseBeforeSwitchActive := c.reconcileBlueGreenPause(activeSvc, r)
if pauseBeforeSwitchActive {
logCtx.Info("Not finished reconciling pause before switching active service")
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, true)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, true, metrics.Progressing)
}

logCtx.Infof("Reconciling active service '%s'", activeSvc.Name)
switchActiveSvc, err := c.reconcileActiveService(r, newRS, previewSvc, activeSvc)
if err != nil {
return err
return c.metricsServer.IncError(r, err)
}
if switchActiveSvc {
logCtx.Infof("Not Finished reconciling active service '%s'", activeSvc.Name)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false, metrics.Progressing)
}

// Scale down, if we can.
logCtx.Info("Reconciling old replica sets")
scaledDown, err := c.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, r)
if err != nil {
return err
return c.metricsServer.IncError(r, err)
}
if scaledDown {
logCtx.Info("Not finished reconciling old replica sets")
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false)
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false, metrics.Progressing)
}

logCtx.Infof("Confirming rollout is complete")
if conditions.RolloutComplete(r, &r.Status) {
logCtx.Info("Cleaning up old replicasets")
if err := c.cleanupRollouts(oldRSs, r); err != nil {
return err
return c.metricsServer.IncError(r, err)
}
}
return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false)

return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false, metrics.Completed)
}

func (c *Controller) reconcileBlueGreenPause(activeSvc *corev1.Service, rollout *v1alpha1.Rollout) bool {
Expand Down Expand Up @@ -131,7 +132,7 @@ func (c *Controller) scaleDownOldReplicaSetsForBlueGreen(allRSs []*appsv1.Replic
return totalScaledDown, nil
}

func (c *Controller) syncRolloutStatusBlueGreen(allRSs []*appsv1.ReplicaSet, newRS *appsv1.ReplicaSet, previewSvc *corev1.Service, activeSvc *corev1.Service, r *v1alpha1.Rollout, addPause bool) error {
func (c *Controller) syncRolloutStatusBlueGreen(allRSs []*appsv1.ReplicaSet, newRS *appsv1.ReplicaSet, previewSvc *corev1.Service, activeSvc *corev1.Service, r *v1alpha1.Rollout, addPause bool, phase metrics.ReconcilePhase) error {
newStatus := c.calculateBaseStatus(allRSs, newRS, r)
previewSelector, ok := c.getRolloutSelectorLabel(previewSvc)
if !ok {
Expand All @@ -158,7 +159,7 @@ func (c *Controller) syncRolloutStatusBlueGreen(allRSs []*appsv1.ReplicaSet, new

pauseStartTime, paused := calculatePauseStatus(r, addPause)
newStatus.PauseStartTime = pauseStartTime
return c.persistRolloutStatus(r, &newStatus, &paused)
return c.persistRolloutStatus(r, &newStatus, &paused, phase)
}

// Should run only on scaling events and not during the normal rollout process.
Expand Down
31 changes: 19 additions & 12 deletions controller/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/utils/pointer"

"github.com/argoproj/argo-rollouts/controller/metrics"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/conditions"
"github.com/argoproj/argo-rollouts/utils/defaults"
Expand All @@ -21,7 +22,7 @@ func (c *Controller) rolloutCanary(rollout *v1alpha1.Rollout, rsList []*appsv1.R
logCtx.Info("List of Canary steps have changed and need to reset CurrentStepIndex")
newRS, previousRSs, err := c.getAllReplicaSetsAndSyncRevision(rollout, rsList, false)
if err != nil {
return err
return c.metricsServer.IncError(rollout, err)
}
stableRS, oldRSs := replicasetutil.GetStableRS(rollout, newRS, previousRSs)
return c.syncRolloutStatusCanary(oldRSs, newRS, stableRS, rollout)
Expand All @@ -39,7 +40,7 @@ func (c *Controller) rolloutCanary(rollout *v1alpha1.Rollout, rsList []*appsv1.R
newRS, previousRSs, err := c.getAllReplicaSetsAndSyncRevision(rollout, rsList, true)
stableRS, oldRSs := replicasetutil.GetStableRS(rollout, newRS, previousRSs)
if err != nil {
return err
return c.metricsServer.IncError(rollout, err)
}
allRSs := append(oldRSs, newRS)
if stableRS != nil {
Expand All @@ -49,7 +50,7 @@ func (c *Controller) rolloutCanary(rollout *v1alpha1.Rollout, rsList []*appsv1.R
logCtx.Info("Reconciling StableRS")
scaledStableRS, err := c.reconcileStableRS(oldRSs, newRS, stableRS, rollout)
if err != nil {
return err
return c.metricsServer.IncError(rollout, err)
}
if scaledStableRS {
logCtx.Infof("Not finished reconciling stableRS")
Expand All @@ -59,7 +60,7 @@ func (c *Controller) rolloutCanary(rollout *v1alpha1.Rollout, rsList []*appsv1.R
logCtx.Infof("Reconciling new ReplicaSet '%s'", newRS.Name)
scaledNewRS, err := c.reconcileNewReplicaSet(allRSs, newRS, rollout)
if err != nil {
return err
return c.metricsServer.IncError(rollout, err)
}
if scaledNewRS {
logCtx.Infof("Not finished reconciling new ReplicaSet '%s'", newRS.Name)
Expand All @@ -69,7 +70,7 @@ func (c *Controller) rolloutCanary(rollout *v1alpha1.Rollout, rsList []*appsv1.R
logCtx.Info("Reconciling old replica sets")
scaledDown, err := c.reconcileOldReplicaSetsCanary(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, rollout)
if err != nil {
return err
return c.metricsServer.IncError(rollout, err)
}
if scaledDown {
logCtx.Info("Not finished reconciling old replica sets")
Expand Down Expand Up @@ -210,6 +211,7 @@ func (c *Controller) syncRolloutStatusCanary(olderRSs []*appsv1.ReplicaSet, newR
allRSs = append(allRSs, stableRS)
}
newStatus := c.calculateBaseStatus(allRSs, newRS, r)
phase := metrics.Progressing

currentStep, currentStepIndex := replicasetutil.GetCurrentCanaryStep(r)
newStatus.Canary.StableRS = r.Status.Canary.StableRS
Expand All @@ -220,29 +222,32 @@ func (c *Controller) syncRolloutStatusCanary(olderRSs []*appsv1.ReplicaSet, newR
newStatus.CurrentStepIndex = replicasetutil.ResetCurrentStepIndex(r)
if r.Status.Canary.StableRS == controller.ComputeHash(&r.Spec.Template, r.Status.CollisionCount) {
if newStatus.CurrentStepIndex != nil {
phase = metrics.Completed
logCtx.Info("Skipping all steps because the newRS is the stableRS.")
newStatus.CurrentStepIndex = pointer.Int32Ptr(stepCount)
c.recorder.Eventf(r, corev1.EventTypeNormal, "SetStepIndex", "Set Step Index to %d", int(stepCount))

}
}
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false), phase)
}

if replicasetutil.CheckPodSpecChange(r) {
newStatus.CurrentStepIndex = replicasetutil.ResetCurrentStepIndex(r)
if r.Status.Canary.StableRS == controller.ComputeHash(&r.Spec.Template, r.Status.CollisionCount) {
if newStatus.CurrentStepIndex != nil {
phase = metrics.Completed
logCtx.Info("Skipping all steps because the newRS is the stableRS.")
newStatus.CurrentStepIndex = pointer.Int32Ptr(stepCount)
c.recorder.Eventf(r, corev1.EventTypeNormal, "SetStepIndex", "Set Step Index to %d", int(stepCount))
}
}
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false), phase)
}

if r.Status.Canary.StableRS == "" {
logCtx.Info("Setting StableRS to CurrentPodHash because it is empty beforehand")
phase = metrics.Completed
newStatus.Canary.StableRS = newStatus.CurrentPodHash
if stepCount > 0 {
if stepCount != *currentStepIndex {
Expand All @@ -251,20 +256,22 @@ func (c *Controller) syncRolloutStatusCanary(olderRSs []*appsv1.ReplicaSet, newR
newStatus.CurrentStepIndex = &stepCount

}
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false), phase)
}

if stepCount == 0 {
logCtx.Info("Rollout has no steps so setting stableRS status to currentPodHash")
newStatus.Canary.StableRS = newStatus.CurrentPodHash
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
phase = metrics.Completed
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false), phase)
}

if *currentStepIndex == stepCount {
logCtx.Info("Rollout has executed every step")
phase = metrics.Completed
newStatus.CurrentStepIndex = &stepCount
newStatus.Canary.StableRS = newStatus.CurrentPodHash
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false), phase)
}

if completedCurrentCanaryStep(olderRSs, newRS, stableRS, r) {
Expand All @@ -275,13 +282,13 @@ func (c *Controller) syncRolloutStatusCanary(olderRSs []*appsv1.ReplicaSet, newR
}
logCtx.Infof("Incrementing the Current Step Index to %d", *currentStepIndex)
c.recorder.Eventf(r, corev1.EventTypeNormal, "SetStepIndex", "Set Step Index to %d", int(*currentStepIndex))
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false), phase)
}

addPause := currentStep.Pause != nil
pauseStartTime, paused := calculatePauseStatus(r, addPause)
newStatus.PauseStartTime = pauseStartTime

newStatus.CurrentStepIndex = currentStepIndex
return c.persistRolloutStatus(r, &newStatus, &paused)
return c.persistRolloutStatus(r, &newStatus, &paused, phase)
}
Loading

0 comments on commit 6ae22c7

Please sign in to comment.