Skip to content

Commit

Permalink
Add Initial Prometheus Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dthomson25 committed Mar 27, 2019
1 parent f8c9c38 commit e0c77c0
Show file tree
Hide file tree
Showing 13 changed files with 1,374 additions and 12 deletions.
77 changes: 75 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion controller/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func (c *Controller) rolloutBlueGreen(r *v1alpha1.Rollout, rsList []*appsv1.Repl
return err
}
allRSs := append(oldRSs, newRS)

// Scale up, if we can.
logCtx.Infof("Reconciling new ReplicaSet '%s'", newRS.Name)
scaledUp, err := c.reconcileNewReplicaSet(allRSs, newRS, r)
Expand Down Expand Up @@ -86,6 +85,7 @@ func (c *Controller) rolloutBlueGreen(r *v1alpha1.Rollout, rsList []*appsv1.Repl
return err
}
}

return c.syncRolloutStatusBlueGreen(allRSs, newRS, previewSvc, activeSvc, r, false)
}

Expand Down
37 changes: 29 additions & 8 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -24,6 +25,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"

"github.com/argoproj/argo-rollouts/controller/metrics"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
rolloutscheme "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -67,6 +69,7 @@ type Controller struct {
replicaSetSynced cache.InformerSynced
rolloutsLister listers.RolloutLister
rolloutsSynced cache.InformerSynced
metricsServer *metrics.MetricsServer

// used for unit testing
enqueueRollout func(obj interface{})
Expand Down Expand Up @@ -105,6 +108,7 @@ func NewController(
KubeClient: kubeclientset,
Recorder: recorder,
}
metricsAddr := fmt.Sprintf("0.0.0.0:%d", 8080)

controller := &Controller{
kubeclientset: kubeclientset,
Expand All @@ -117,6 +121,7 @@ func NewController(
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts"),
recorder: recorder,
resyncPeriod: resyncPeriod,
metricsServer: metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister()),
}
controller.enqueueRollout = controller.enqueueRateLimited
controller.enqueueRolloutAfter = controller.enqueueAfter
Expand Down Expand Up @@ -171,6 +176,14 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
}

log.Info("Started workers")
go func() {
log.Infof("Starting Metric Server at %s", c.metricsServer.Addr)
err := c.metricsServer.ListenAndServe()
if err != nil {
err = errors.Wrap(err, "Starting Metric Server")
log.Fatal(err)
}
}()
<-stopCh
log.Info("Shutting down workers")

Expand Down Expand Up @@ -221,9 +234,15 @@ func (c *Controller) processNextWorkItem() bool {
// Run the syncHandler, passing it the namespace/name string of the
// Rollout resource to be synced.
if err := c.syncHandler(key); err != nil {
err := fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
namespace, name, splitErr := cache.SplitMetaNamespaceKey(key)
if splitErr != nil {
return errors.Wrapf(err, "Error splitting key %s: %s", key, splitErr.Error())
}
c.metricsServer.IncError(namespace, name)
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
return err
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
Expand All @@ -246,25 +265,27 @@ func (c *Controller) processNextWorkItem() bool {
func (c *Controller) syncHandler(key string) error {
startTime := time.Now()
log.WithField(logutil.RolloutKey, key).Infof("Started syncing rollout at (%v)", startTime)
defer func() {
log.WithField(logutil.RolloutKey, key).Infof("Finished syncing rollout (%v)", time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
rollout, err := c.rolloutsLister.Rollouts(namespace).Get(name)
if errors.IsNotFound(err) {
if k8serrors.IsNotFound(err) {
log.WithField(logutil.RolloutKey, key).Infof("Rollout %v has been deleted", key)
return nil
return err
}
if err != nil {
return err
}

// Deep-copy otherwise we are mutating our cache.
r := rollout.DeepCopy()
defer func() {
duration := time.Since(startTime)
c.metricsServer.IncReconcile(r, duration)
logCtx := logutil.WithRollout(r).WithField("time_ms", duration.Seconds()*1e3)
logCtx.Info("Reconciliation completed")
}()

prevCond := conditions.GetRolloutCondition(rollout.Status, v1alpha1.InvalidSpec)
invalidSpecCond := conditions.VerifyRolloutSpec(r, prevCond)
Expand Down
Loading

0 comments on commit e0c77c0

Please sign in to comment.