Skip to content

Commit

Permalink
Instrument TA with prometheus (#1030)
Browse files Browse the repository at this point in the history
* Instrument TA with prometheus

* Added metrics for targets per collector

* Remove auth line

* Updated based on feedback

* Small exporting change
  • Loading branch information
jaronoff97 authored Aug 15, 2022
1 parent 07b9f22 commit 388a7c9
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 2 deletions.
23 changes: 23 additions & 0 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,26 @@ import (
"sync"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
)

var (
collectorsAllocatable = promauto.NewGauge(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_collectors_allocatable",
Help: "Number of collectors the allocator is able to allocate to.",
})
targetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_targets_per_collector",
Help: "The number of targets for each collector.",
}, []string{"collector_name"})
timeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "opentelemetry_allocator_time_to_allocate",
Help: "The time it takes to allocate",
}, []string{"method"})
)

/*
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets <- these are configured using least connection
Load balancer will need information about the collectors in order to set the URLs
Expand Down Expand Up @@ -96,12 +113,15 @@ func (allocator *Allocator) SetCollectors(collectors []string) {
for _, i := range collectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
collectorsAllocatable.Set(float64(len(collectors)))
}

// Reallocate needs to be called to process the new target updates.
// Until Reallocate is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.removeOutdatedTargets()
allocator.processWaitingTargets()
Expand All @@ -110,6 +130,8 @@ func (allocator *Allocator) AllocateTargets() {
// ReallocateCollectors reallocates the targets among the new collector instances
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.TargetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
Expand Down Expand Up @@ -139,6 +161,7 @@ func (allocator *Allocator) processWaitingTargets() {
Collector: col,
}
col.NumTargets++
targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets))
allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem
}
}
Expand Down
9 changes: 8 additions & 1 deletion cmd/otel-allocator/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -20,7 +22,11 @@ const (
)

var (
ns = os.Getenv("OTELCOL_NAMESPACE")
ns = os.Getenv("OTELCOL_NAMESPACE")
collectors = promauto.NewGauge(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_collectors_discovered",
Help: "Number of collectors discovered.",
})
)

type Client struct {
Expand Down Expand Up @@ -89,6 +95,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(
func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]bool, fn func(collectors []string)) string {
log := k.log.WithValues("component", "opentelemetry-targetallocator")
for {
collectors.Set(float64(len(collectorMap)))
select {
case <-k.close:
return "kubernetes client closed"
Expand Down
12 changes: 12 additions & 0 deletions cmd/otel-allocator/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@ import (
"github.com/go-logr/logr"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
)

var (
targetsDiscovered = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_targets",
Help: "Number of targets discovered.",
}, []string{"job_name"})
)

type Manager struct {
log logr.Logger
manager *discovery.Manager
Expand Down Expand Up @@ -63,15 +72,18 @@ func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) {
targets := []allocation.TargetItem{}

for jobName, tgs := range tsets {
var count float64 = 0
for _, tg := range tgs {
for _, t := range tg.Targets {
count++
targets = append(targets, allocation.TargetItem{
JobName: jobName,
TargetURL: string(t[model.AddressLabel]),
Label: t.Merge(tg.Labels),
})
}
}
targetsDiscovered.WithLabelValues(jobName).Set(count)
}
fn(targets)
}
Expand Down
27 changes: 26 additions & 1 deletion cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
ctrl "sigs.k8s.io/controller-runtime"
)

var (
setupLog = ctrl.Log.WithName("setup")
setupLog = ctrl.Log.WithName("setup")
httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "opentelemetry_allocator_http_duration_seconds",
Help: "Duration of received HTTP requests.",
}, []string{"path"})
eventsMetric = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "opentelemetry_allocator_events",
Help: "Number of events in the channel.",
}, []string{"source"})
)

func main() {
Expand Down Expand Up @@ -75,6 +86,7 @@ func main() {
}
os.Exit(0)
case event := <-watcher.Events:
eventsMetric.WithLabelValues(event.Source.String()).Inc()
switch event.Source {
case allocatorWatcher.EventSourceConfigMap:
setupLog.Info("ConfigMap updated!")
Expand Down Expand Up @@ -129,8 +141,10 @@ func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManage
k8sClient: k8sclient,
}
router := mux.NewRouter().UseEncodedPath()
router.Use(s.PrometheusMiddleware)
router.HandleFunc("/jobs", s.JobHandler).Methods("GET")
router.HandleFunc("/jobs/{job_id}/targets", s.TargetsHandler).Methods("GET")
router.Path("/metrics").Handler(promhttp.Handler())
s.server = &http.Server{Addr: *cliConf.ListenAddr, Handler: router}
return s, nil
}
Expand Down Expand Up @@ -177,6 +191,17 @@ func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) {
jsonHandler(w, r, displayData)
}

// PrometheusMiddleware implements mux.MiddlewareFunc.
func (s *server) PrometheusMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
timer := prometheus.NewTimer(httpDuration.WithLabelValues(path))
next.ServeHTTP(w, r)
timer.ObserveDuration()
})
}

func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()["collector_id"]

Expand Down
11 changes: 11 additions & 0 deletions cmd/otel-allocator/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ const (
EventSourcePrometheusCR
)

var (
eventSourceToString = map[EventSource]string{
EventSourceConfigMap: "EventSourceConfigMap",
EventSourcePrometheusCR: "EventSourcePrometheusCR",
}
)

func (e EventSource) String() string {
return eventSourceToString[e]
}

func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator *allocation.Allocator) (*Manager, error) {
watcher := Manager{
allocator: allocator,
Expand Down

0 comments on commit 388a7c9

Please sign in to comment.