Skip to content

Commit

Permalink
Fix Makefile and forwarder metrics/tests
Browse files Browse the repository at this point in the history
Signed-off-by: Saswata Mukherjee <[email protected]>
  • Loading branch information
saswatamcode committed Oct 15, 2023
1 parent d4899ee commit 81c6df5
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 74 deletions.
36 changes: 28 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ TMP_DIR := $(shell pwd)/tmp
BIN_DIR ?= $(TMP_DIR)/bin
GIT ?= $(shell which git)

XARGS ?= $(shell which gxargs 2>/dev/null || which xargs)
GREP ?= $(shell which ggrep 2>/dev/null || which grep)

# Image URL to use all building/pushing image targets
IMG ?= quay.io/stolostron/multicluster-observability-operator:latest

Expand All @@ -33,16 +36,16 @@ docker-push:
unit-tests: unit-tests-operators unit-tests-loaders unit-tests-proxy unit-tests-collectors

unit-tests-operators:
go test -v ${VERBOSE} `go list ./operators/... | grep -v test`
go test -v ${VERBOSE} `go list ./operators/... | $(GREP) -v test`

unit-tests-loaders:
go test -v ${VERBOSE} `go list ./loaders/... | grep -v test`
go test -v ${VERBOSE} `go list ./loaders/... | $(GREP) -v test`

unit-tests-proxy:
go test -v ${VERBOSE} `go list ./proxy/... | grep -v test`
go test -v ${VERBOSE} `go list ./proxy/... | $(GREP) -v test`

unit-tests-collectors:
go test ${VERBOSE} `go list ./collectors/... | grep -v test`
go test ${VERBOSE} `go list ./collectors/... | $(GREP) -v test`

.PHONY: e2e-tests
e2e-tests:
Expand Down Expand Up @@ -93,6 +96,25 @@ shell-format: $(SHFMT)
format: ## Formats code including imports.
format: go-format shell-format

define require_clean_work_tree
@git update-index -q --ignore-submodules --refresh

@if ! git diff-files --quiet --ignore-submodules --; then \
echo >&2 "cannot $1: you have unstaged changes."; \
git diff -r --ignore-submodules -- >&2; \
echo >&2 "Please commit or stash them."; \
exit 1; \
fi

@if ! git diff-index --cached --quiet HEAD --ignore-submodules --; then \
echo >&2 "cannot $1: your index contains uncommitted changes."; \
git diff --cached -r --ignore-submodules HEAD -- >&2; \
echo >&2 "Please commit or stash them."; \
exit 1; \
fi

endef

# PROTIP:
# Add
# --cpu-profile-path string Path to CPU profile output file
Expand All @@ -112,15 +134,13 @@ NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/cl
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\
github.com/NYTimes/gziphandler.{GzipHandler}=github.com/klauspost/compress/gzhttp.{GzipHandler},\
sync/atomic=go.uber.org/atomic,\
io/ioutil.{Discard,NopCloser,ReadAll,ReadDir,ReadFile,TempDir,TempFile,Writefile}" $(shell go list ./... | grep -v "internal/cortex")
io/ioutil.{Discard,NopCloser,ReadAll,ReadDir,ReadFile,TempDir,TempFile,Writefile}" ./...
@$(FAILLINT) -paths "fmt.{Print,Println}" -ignore-tests ./...
@echo ">> examining all of the Go files"
@go vet -stdmethods=false ./...
@echo ">> linting all of the Go files GOGC=${GOGC}"
@$(GOLANGCI_LINT) run
# TODO(saswatamcode): Enable this in a separate commit.
# @echo ">> ensuring Copyright headers"
@echo ">> ensuring Copyright headers $(MISS)"
# @go run ./scripts/copyright
@echo ">> detecting misspells"
@find . -type f | grep -v vendor/ | grep -vE '\./\..*' | gxargs $(MISSPELL) -error
$(call require_clean_work_tree,'detected files without copyright, run make lint and commit changes')
11 changes: 6 additions & 5 deletions collectors/metrics/cmd/metrics-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ func (o *Options) Run() error {
return err
}

cfg.Registry = metricsReg

metrics := forwarder.NewWorkerMetrics(metricsReg)
cfg.Metrics = metrics
worker, err := forwarder.New(*cfg)
if err != nil {
return errors.Wrap(err, "failed to configure metrics collector")
Expand Down Expand Up @@ -357,7 +357,7 @@ func (o *Options) Run() error {
handlers := http.NewServeMux()
collectorhttp.DebugRoutes(handlers)
collectorhttp.HealthRoutes(handlers)
collectorhttp.MetricRoutes(handlers)
collectorhttp.MetricRoutes(handlers, metricsReg)
collectorhttp.ReloadRoutes(handlers, func() error {
return worker.Reconfigure(*cfg)
})
Expand All @@ -384,7 +384,7 @@ func (o *Options) Run() error {
}
}

err = runMultiWorkers(o)
err = runMultiWorkers(o, cfg)
if err != nil {
return err
}
Expand All @@ -406,7 +406,7 @@ func (o *Options) Run() error {
return g.Run()
}

func runMultiWorkers(o *Options) error {
func runMultiWorkers(o *Options, cfg *forwarder.Config) error {
for i := 1; i < int(o.WorkerNum); i++ {
opt := &Options{
From: o.From,
Expand Down Expand Up @@ -443,6 +443,7 @@ func runMultiWorkers(o *Options) error {
return err
}

forwardCfg.Metrics = cfg.Metrics
forwardWorker, err := forwarder.New(*forwardCfg)
if err != nil {
return errors.Wrap(err, "failed to configure metrics collector")
Expand Down
4 changes: 3 additions & 1 deletion collectors/metrics/cmd/metrics-collector/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/forwarder"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/logger"
)

Expand Down Expand Up @@ -41,7 +43,7 @@ func TestMultiWorkers(t *testing.T) {
stdlog.SetOutput(log.NewStdlibAdapter(l))
opt.Logger = l

err := runMultiWorkers(opt)
err := runMultiWorkers(opt, &forwarder.Config{Metrics: forwarder.NewWorkerMetrics(prometheus.NewRegistry())})
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions collectors/metrics/pkg/collectrule/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func New(cfg forwarder.Config) (*Evaluator, error) {
LimitBytes: cfg.LimitBytes,
Transformer: cfg.Transformer,

Logger: cfg.Logger,
Logger: cfg.Logger,
Metrics: cfg.Metrics,
}
from := &url.URL{
Scheme: cfg.From.Scheme,
Expand All @@ -107,7 +108,7 @@ func New(cfg forwarder.Config) (*Evaluator, error) {
evaluator.interval = 30 * time.Second
}

fromClient, err := forwarder.CreateFromClient(cfg, evaluator.interval, "evaluate_query", cfg.Logger)
fromClient, err := forwarder.CreateFromClient(cfg, cfg.Metrics, evaluator.interval, "evaluate_query", cfg.Logger)
if err != nil {
return nil, err
}
Expand Down
73 changes: 48 additions & 25 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Config struct {
Logger log.Logger
SimulatedTimeseriesFile string

Registry *prometheus.Registry
Metrics *workerMetrics
}

// Worker represents a metrics forwarding agent. It collects metrics from a source URL and forwards them to a sink.
Expand Down Expand Up @@ -96,12 +96,10 @@ type Worker struct {

status status.StatusReport

gaugeFederateSamples prometheus.Gauge
gaugeFederateFilteredSamples prometheus.Gauge
gaugeFederateErrors prometheus.Gauge
m *workerMetrics
}

func CreateFromClient(cfg Config, interval time.Duration, name string,
func CreateFromClient(cfg Config, m *workerMetrics, interval time.Duration, name string,
logger log.Logger) (*metricsclient.Client, error) {
fromTransport := metricsclient.DefaultTransport(logger, false)
if len(cfg.FromCAFile) > 0 {
Expand Down Expand Up @@ -148,12 +146,12 @@ func CreateFromClient(cfg Config, interval time.Duration, name string,
fromClient.Transport = metricshttp.NewBearerRoundTripper(cfg.FromToken, fromClient.Transport)
}

from := metricsclient.New(logger, cfg.Registry, fromClient, cfg.LimitBytes, interval, "federate_from")
from := metricsclient.New(logger, m.clientMetrics, fromClient, cfg.LimitBytes, interval, "federate_from")

return from, nil
}

func createClients(cfg Config, interval time.Duration,
func createClients(cfg Config, m *metricsclient.ClientMetrics, interval time.Duration,
logger log.Logger) (*metricsclient.Client, *metricsclient.Client, metricfamily.MultiTransformer, error) {

var transformer metricfamily.MultiTransformer
Expand Down Expand Up @@ -181,7 +179,7 @@ func createClients(cfg Config, interval time.Duration,
if len(cfg.AnonymizeLabels) > 0 {
transformer.With(metricfamily.NewMetricsAnonymizer(anonymizeSalt, cfg.AnonymizeLabels, nil))
}
from, err := CreateFromClient(cfg, interval, "federate_from", logger)
from, err := CreateFromClient(cfg, cfg.Metrics, interval, "federate_from", logger)
if err != nil {
return nil, nil, transformer, err
}
Expand All @@ -197,10 +195,46 @@ func createClients(cfg Config, interval time.Duration,
if cfg.Debug {
toClient.Transport = metricshttp.NewDebugRoundTripper(logger, toClient.Transport)
}
to := metricsclient.New(logger, cfg.Registry, toClient, cfg.LimitBytes, interval, "federate_to")
to := metricsclient.New(logger, m, toClient, cfg.LimitBytes, interval, "federate_to")
return from, to, transformer, nil
}

type workerMetrics struct {
gaugeFederateSamples prometheus.Gauge
gaugeFederateFilteredSamples prometheus.Gauge
gaugeFederateErrors prometheus.Gauge

clientMetrics *metricsclient.ClientMetrics
}

func NewWorkerMetrics(reg *prometheus.Registry) *workerMetrics {
return &workerMetrics{
gaugeFederateSamples: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "federate_samples",
Help: "Tracks the number of samples per federation",
}),
gaugeFederateFilteredSamples: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "federate_filtered_samples",
Help: "Tracks the number of samples filtered per federation",
}),
gaugeFederateErrors: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "federate_errors",
Help: "The number of times forwarding federated metrics has failed",
}),

clientMetrics: &metricsclient.ClientMetrics{
GaugeRequestRetrieve: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "metricsclient_request_retrieve",
Help: "Tracks the number of metrics retrievals",
}, []string{"client", "status_code"}),
GaugeRequestSend: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "metricsclient_request_send",
Help: "Tracks the number of metrics sends",
}, []string{"client", "status_code"}),
},
}
}

// New creates a new Worker based on the provided Config. If the Config contains invalid
// values, then an error is returned.
func New(cfg Config) (*Worker, error) {
Expand All @@ -217,25 +251,14 @@ func New(cfg Config) (*Worker, error) {
to: cfg.ToUpload,
logger: log.With(cfg.Logger, "component", "forwarder/worker"),
simulatedTimeseriesFile: cfg.SimulatedTimeseriesFile,
gaugeFederateSamples: promauto.With(cfg.Registry).NewGauge(prometheus.GaugeOpts{
Name: "federate_samples",
Help: "Tracks the number of samples per federation",
}),
gaugeFederateFilteredSamples: promauto.With(cfg.Registry).NewGauge(prometheus.GaugeOpts{
Name: "federate_filtered_samples",
Help: "Tracks the number of samples filtered per federation",
}),
gaugeFederateErrors: promauto.With(cfg.Registry).NewGauge(prometheus.GaugeOpts{
Name: "federate_errors",
Help: "The number of times forwarding federated metrics has failed",
}),
m: cfg.Metrics,
}

if w.interval == 0 {
w.interval = 4*time.Minute + 30*time.Second
}

fromClient, toClient, transformer, err := createClients(cfg, w.interval, logger)
fromClient, toClient, transformer, err := createClients(cfg, w.m.clientMetrics, w.interval, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -326,7 +349,7 @@ func (w *Worker) Run(ctx context.Context) {
w.lock.Unlock()

if err := w.forward(ctx); err != nil {
w.gaugeFederateErrors.Inc()
w.m.gaugeFederateErrors.Inc()
rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err)
wait = time.Minute
}
Expand Down Expand Up @@ -389,8 +412,8 @@ func (w *Worker) forward(ctx context.Context) error {
families = metricfamily.Pack(families)
after := metricfamily.MetricsCount(families)

w.gaugeFederateSamples.Set(float64(before))
w.gaugeFederateFilteredSamples.Set(float64(before - after))
w.m.gaugeFederateSamples.Set(float64(before))
w.m.gaugeFederateFilteredSamples.Set(float64(before - after))

w.lastMetrics = families

Expand Down
13 changes: 9 additions & 4 deletions collectors/metrics/pkg/forwarder/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
Expand Down Expand Up @@ -124,6 +125,7 @@ func TestNew(t *testing.T) {
}

for i := range tc {
tc[i].c.Metrics = NewWorkerMetrics(prometheus.NewRegistry())
if _, err := New(tc[i].c); (err != nil) != tc[i].err {
no := "no"
if tc[i].err {
Expand All @@ -140,8 +142,9 @@ func TestReconfigure(t *testing.T) {
t.Fatalf("failed to parse `from` URL: %v", err)
}
c := Config{
From: from,
Logger: log.NewNopLogger(),
From: from,
Logger: log.NewNopLogger(),
Metrics: NewWorkerMetrics(prometheus.NewRegistry()),
}
w, err := New(c)
if err != nil {
Expand Down Expand Up @@ -182,6 +185,7 @@ func TestReconfigure(t *testing.T) {
}

for i := range tc {
tc[i].c.Metrics = NewWorkerMetrics(prometheus.NewRegistry())
if err := w.Reconfigure(tc[i].c); (err != nil) != tc[i].err {
no := "no"
if tc[i].err {
Expand All @@ -206,6 +210,7 @@ func TestRun(t *testing.T) {
From: &url.URL{},
FromQuery: &url.URL{},
Logger: log.NewNopLogger(),
Metrics: NewWorkerMetrics(prometheus.NewRegistry()),
}
w, err := New(c)
if err != nil {
Expand Down Expand Up @@ -235,7 +240,7 @@ func TestRun(t *testing.T) {
if err != nil {
stdlog.Fatalf("failed to parse second test server URL: %v", err)
}
if err := w.Reconfigure(Config{From: from, FromQuery: from, Logger: log.NewNopLogger()}); err != nil {
if err := w.Reconfigure(Config{From: from, FromQuery: from, Logger: log.NewNopLogger(), Metrics: NewWorkerMetrics(prometheus.NewRegistry())}); err != nil {
stdlog.Fatalf("failed to reconfigure worker with second test server url: %v", err)
}
}()
Expand All @@ -248,7 +253,7 @@ func TestRun(t *testing.T) {
}
if err := w.Reconfigure(Config{From: from, FromQuery: from,
RecordingRules: []string{"{\"name\":\"test\",\"query\":\"test\"}"},
Logger: log.NewNopLogger()}); err != nil {
Logger: log.NewNopLogger(), Metrics: NewWorkerMetrics(prometheus.NewRegistry())}); err != nil {
t.Fatalf("failed to reconfigure worker with first test server url: %v", err)
}

Expand Down
5 changes: 3 additions & 2 deletions collectors/metrics/pkg/http/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"net/http/pprof"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand All @@ -27,8 +28,8 @@ func HealthRoutes(mux *http.ServeMux) *http.ServeMux {
}

// MetricRoutes adds the metrics endpoint to a mux.
func MetricRoutes(mux *http.ServeMux) *http.ServeMux {
mux.Handle("/metrics", promhttp.Handler())
func MetricRoutes(mux *http.ServeMux, reg *prometheus.Registry) *http.ServeMux {
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
return mux
}

Expand Down
Loading

0 comments on commit 81c6df5

Please sign in to comment.