diff --git a/Makefile b/Makefile index 877b1558b6..c4d16007eb 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,9 @@ TMP_DIR := $(shell pwd)/tmp BIN_DIR ?= $(TMP_DIR)/bin GIT ?= $(shell which git) +XARGS ?= $(shell which gxargs 2>/dev/null || which xargs) +GREP ?= $(shell which ggrep 2>/dev/null || which grep) + # Image URL to use all building/pushing image targets IMG ?= quay.io/stolostron/multicluster-observability-operator:latest @@ -33,16 +36,16 @@ docker-push: unit-tests: unit-tests-operators unit-tests-loaders unit-tests-proxy unit-tests-collectors unit-tests-operators: - go test -v ${VERBOSE} `go list ./operators/... | grep -v test` + go test -v ${VERBOSE} `go list ./operators/... | $(GREP) -v test` unit-tests-loaders: - go test -v ${VERBOSE} `go list ./loaders/... | grep -v test` + go test -v ${VERBOSE} `go list ./loaders/... | $(GREP) -v test` unit-tests-proxy: - go test -v ${VERBOSE} `go list ./proxy/... | grep -v test` + go test -v ${VERBOSE} `go list ./proxy/... | $(GREP) -v test` unit-tests-collectors: - go test ${VERBOSE} `go list ./collectors/... | grep -v test` + go test ${VERBOSE} `go list ./collectors/... | $(GREP) -v test` .PHONY: e2e-tests e2e-tests: @@ -93,6 +96,25 @@ shell-format: $(SHFMT) format: ## Formats code including imports. format: go-format shell-format +define require_clean_work_tree + @git update-index -q --ignore-submodules --refresh + + @if ! git diff-files --quiet --ignore-submodules --; then \ + echo >&2 "cannot $1: you have unstaged changes."; \ + git diff -r --ignore-submodules -- >&2; \ + echo >&2 "Please commit or stash them."; \ + exit 1; \ + fi + + @if ! git diff-index --cached --quiet HEAD --ignore-submodules --; then \ + echo >&2 "cannot $1: your index contains uncommitted changes."; \ + git diff --cached -r --ignore-submodules HEAD -- >&2; \ + echo >&2 "Please commit or stash them."; \ + exit 1; \ + fi + +endef + # PROTIP: # Add # --cpu-profile-path string Path to CPU profile output file @@ -112,15 +134,13 @@ NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/cl NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\ github.com/NYTimes/gziphandler.{GzipHandler}=github.com/klauspost/compress/gzhttp.{GzipHandler},\ sync/atomic=go.uber.org/atomic,\ -io/ioutil.{Discard,NopCloser,ReadAll,ReadDir,ReadFile,TempDir,TempFile,Writefile}" $(shell go list ./... | grep -v "internal/cortex") +io/ioutil.{Discard,NopCloser,ReadAll,ReadDir,ReadFile,TempDir,TempFile,Writefile}" ./... @$(FAILLINT) -paths "fmt.{Print,Println}" -ignore-tests ./... @echo ">> examining all of the Go files" @go vet -stdmethods=false ./... @echo ">> linting all of the Go files GOGC=${GOGC}" @$(GOLANGCI_LINT) run # TODO(saswatamcode): Enable this in a separate commit. - # @echo ">> ensuring Copyright headers" + @echo ">> ensuring Copyright headers $(MISS)" # @go run ./scripts/copyright - @echo ">> detecting misspells" - @find . -type f | grep -v vendor/ | grep -vE '\./\..*' | gxargs $(MISSPELL) -error $(call require_clean_work_tree,'detected files without copyright, run make lint and commit changes') \ No newline at end of file diff --git a/collectors/metrics/cmd/metrics-collector/main.go b/collectors/metrics/cmd/metrics-collector/main.go index b7b0338584..f3fbd5ff87 100644 --- a/collectors/metrics/cmd/metrics-collector/main.go +++ b/collectors/metrics/cmd/metrics-collector/main.go @@ -306,8 +306,8 @@ func (o *Options) Run() error { return err } - cfg.Registry = metricsReg - + metrics := forwarder.NewWorkerMetrics(metricsReg) + cfg.Metrics = metrics worker, err := forwarder.New(*cfg) if err != nil { return errors.Wrap(err, "failed to configure metrics collector") @@ -357,7 +357,7 @@ func (o *Options) Run() error { handlers := http.NewServeMux() collectorhttp.DebugRoutes(handlers) collectorhttp.HealthRoutes(handlers) - collectorhttp.MetricRoutes(handlers) + collectorhttp.MetricRoutes(handlers, metricsReg) collectorhttp.ReloadRoutes(handlers, func() error { return worker.Reconfigure(*cfg) }) @@ -384,7 +384,7 @@ func (o *Options) Run() error { } } - err = runMultiWorkers(o) + err = runMultiWorkers(o, cfg) if err != nil { return err } @@ -406,7 +406,7 @@ func (o *Options) Run() error { return g.Run() } -func runMultiWorkers(o *Options) error { +func runMultiWorkers(o *Options, cfg *forwarder.Config) error { for i := 1; i < int(o.WorkerNum); i++ { opt := &Options{ From: o.From, @@ -443,6 +443,7 @@ func runMultiWorkers(o *Options) error { return err } + forwardCfg.Metrics = cfg.Metrics forwardWorker, err := forwarder.New(*forwardCfg) if err != nil { return errors.Wrap(err, "failed to configure metrics collector") diff --git a/collectors/metrics/cmd/metrics-collector/main_test.go b/collectors/metrics/cmd/metrics-collector/main_test.go index 2e3f54a0af..cc12000224 100644 --- a/collectors/metrics/cmd/metrics-collector/main_test.go +++ b/collectors/metrics/cmd/metrics-collector/main_test.go @@ -8,6 +8,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/forwarder" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/logger" ) @@ -41,7 +43,7 @@ func TestMultiWorkers(t *testing.T) { stdlog.SetOutput(log.NewStdlibAdapter(l)) opt.Logger = l - err := runMultiWorkers(opt) + err := runMultiWorkers(opt, &forwarder.Config{Metrics: forwarder.NewWorkerMetrics(prometheus.NewRegistry())}) if err != nil { t.Fatal(err) } diff --git a/collectors/metrics/pkg/collectrule/evaluator.go b/collectors/metrics/pkg/collectrule/evaluator.go index 2c4392bf6c..a5f87f495e 100644 --- a/collectors/metrics/pkg/collectrule/evaluator.go +++ b/collectors/metrics/pkg/collectrule/evaluator.go @@ -84,7 +84,8 @@ func New(cfg forwarder.Config) (*Evaluator, error) { LimitBytes: cfg.LimitBytes, Transformer: cfg.Transformer, - Logger: cfg.Logger, + Logger: cfg.Logger, + Metrics: cfg.Metrics, } from := &url.URL{ Scheme: cfg.From.Scheme, @@ -107,7 +108,7 @@ func New(cfg forwarder.Config) (*Evaluator, error) { evaluator.interval = 30 * time.Second } - fromClient, err := forwarder.CreateFromClient(cfg, evaluator.interval, "evaluate_query", cfg.Logger) + fromClient, err := forwarder.CreateFromClient(cfg, cfg.Metrics, evaluator.interval, "evaluate_query", cfg.Logger) if err != nil { return nil, err } diff --git a/collectors/metrics/pkg/forwarder/forwarder.go b/collectors/metrics/pkg/forwarder/forwarder.go index fcea66e40e..b72b0588b5 100644 --- a/collectors/metrics/pkg/forwarder/forwarder.go +++ b/collectors/metrics/pkg/forwarder/forwarder.go @@ -68,7 +68,7 @@ type Config struct { Logger log.Logger SimulatedTimeseriesFile string - Registry *prometheus.Registry + Metrics *workerMetrics } // Worker represents a metrics forwarding agent. It collects metrics from a source URL and forwards them to a sink. @@ -96,12 +96,10 @@ type Worker struct { status status.StatusReport - gaugeFederateSamples prometheus.Gauge - gaugeFederateFilteredSamples prometheus.Gauge - gaugeFederateErrors prometheus.Gauge + m *workerMetrics } -func CreateFromClient(cfg Config, interval time.Duration, name string, +func CreateFromClient(cfg Config, m *workerMetrics, interval time.Duration, name string, logger log.Logger) (*metricsclient.Client, error) { fromTransport := metricsclient.DefaultTransport(logger, false) if len(cfg.FromCAFile) > 0 { @@ -148,12 +146,12 @@ func CreateFromClient(cfg Config, interval time.Duration, name string, fromClient.Transport = metricshttp.NewBearerRoundTripper(cfg.FromToken, fromClient.Transport) } - from := metricsclient.New(logger, cfg.Registry, fromClient, cfg.LimitBytes, interval, "federate_from") + from := metricsclient.New(logger, m.clientMetrics, fromClient, cfg.LimitBytes, interval, "federate_from") return from, nil } -func createClients(cfg Config, interval time.Duration, +func createClients(cfg Config, m *metricsclient.ClientMetrics, interval time.Duration, logger log.Logger) (*metricsclient.Client, *metricsclient.Client, metricfamily.MultiTransformer, error) { var transformer metricfamily.MultiTransformer @@ -181,7 +179,7 @@ func createClients(cfg Config, interval time.Duration, if len(cfg.AnonymizeLabels) > 0 { transformer.With(metricfamily.NewMetricsAnonymizer(anonymizeSalt, cfg.AnonymizeLabels, nil)) } - from, err := CreateFromClient(cfg, interval, "federate_from", logger) + from, err := CreateFromClient(cfg, cfg.Metrics, interval, "federate_from", logger) if err != nil { return nil, nil, transformer, err } @@ -197,10 +195,46 @@ func createClients(cfg Config, interval time.Duration, if cfg.Debug { toClient.Transport = metricshttp.NewDebugRoundTripper(logger, toClient.Transport) } - to := metricsclient.New(logger, cfg.Registry, toClient, cfg.LimitBytes, interval, "federate_to") + to := metricsclient.New(logger, m, toClient, cfg.LimitBytes, interval, "federate_to") return from, to, transformer, nil } +type workerMetrics struct { + gaugeFederateSamples prometheus.Gauge + gaugeFederateFilteredSamples prometheus.Gauge + gaugeFederateErrors prometheus.Gauge + + clientMetrics *metricsclient.ClientMetrics +} + +func NewWorkerMetrics(reg *prometheus.Registry) *workerMetrics { + return &workerMetrics{ + gaugeFederateSamples: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "federate_samples", + Help: "Tracks the number of samples per federation", + }), + gaugeFederateFilteredSamples: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "federate_filtered_samples", + Help: "Tracks the number of samples filtered per federation", + }), + gaugeFederateErrors: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "federate_errors", + Help: "The number of times forwarding federated metrics has failed", + }), + + clientMetrics: &metricsclient.ClientMetrics{ + GaugeRequestRetrieve: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "metricsclient_request_retrieve", + Help: "Tracks the number of metrics retrievals", + }, []string{"client", "status_code"}), + GaugeRequestSend: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "metricsclient_request_send", + Help: "Tracks the number of metrics sends", + }, []string{"client", "status_code"}), + }, + } +} + // New creates a new Worker based on the provided Config. If the Config contains invalid // values, then an error is returned. func New(cfg Config) (*Worker, error) { @@ -217,25 +251,14 @@ func New(cfg Config) (*Worker, error) { to: cfg.ToUpload, logger: log.With(cfg.Logger, "component", "forwarder/worker"), simulatedTimeseriesFile: cfg.SimulatedTimeseriesFile, - gaugeFederateSamples: promauto.With(cfg.Registry).NewGauge(prometheus.GaugeOpts{ - Name: "federate_samples", - Help: "Tracks the number of samples per federation", - }), - gaugeFederateFilteredSamples: promauto.With(cfg.Registry).NewGauge(prometheus.GaugeOpts{ - Name: "federate_filtered_samples", - Help: "Tracks the number of samples filtered per federation", - }), - gaugeFederateErrors: promauto.With(cfg.Registry).NewGauge(prometheus.GaugeOpts{ - Name: "federate_errors", - Help: "The number of times forwarding federated metrics has failed", - }), + m: cfg.Metrics, } if w.interval == 0 { w.interval = 4*time.Minute + 30*time.Second } - fromClient, toClient, transformer, err := createClients(cfg, w.interval, logger) + fromClient, toClient, transformer, err := createClients(cfg, w.m.clientMetrics, w.interval, logger) if err != nil { return nil, err } @@ -326,7 +349,7 @@ func (w *Worker) Run(ctx context.Context) { w.lock.Unlock() if err := w.forward(ctx); err != nil { - w.gaugeFederateErrors.Inc() + w.m.gaugeFederateErrors.Inc() rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err) wait = time.Minute } @@ -389,8 +412,8 @@ func (w *Worker) forward(ctx context.Context) error { families = metricfamily.Pack(families) after := metricfamily.MetricsCount(families) - w.gaugeFederateSamples.Set(float64(before)) - w.gaugeFederateFilteredSamples.Set(float64(before - after)) + w.m.gaugeFederateSamples.Set(float64(before)) + w.m.gaugeFederateFilteredSamples.Set(float64(before - after)) w.lastMetrics = families diff --git a/collectors/metrics/pkg/forwarder/forwarder_test.go b/collectors/metrics/pkg/forwarder/forwarder_test.go index e25de3c3ee..e36965c0f8 100644 --- a/collectors/metrics/pkg/forwarder/forwarder_test.go +++ b/collectors/metrics/pkg/forwarder/forwarder_test.go @@ -12,6 +12,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" ) func init() { @@ -124,6 +125,7 @@ func TestNew(t *testing.T) { } for i := range tc { + tc[i].c.Metrics = NewWorkerMetrics(prometheus.NewRegistry()) if _, err := New(tc[i].c); (err != nil) != tc[i].err { no := "no" if tc[i].err { @@ -140,8 +142,9 @@ func TestReconfigure(t *testing.T) { t.Fatalf("failed to parse `from` URL: %v", err) } c := Config{ - From: from, - Logger: log.NewNopLogger(), + From: from, + Logger: log.NewNopLogger(), + Metrics: NewWorkerMetrics(prometheus.NewRegistry()), } w, err := New(c) if err != nil { @@ -182,6 +185,7 @@ func TestReconfigure(t *testing.T) { } for i := range tc { + tc[i].c.Metrics = NewWorkerMetrics(prometheus.NewRegistry()) if err := w.Reconfigure(tc[i].c); (err != nil) != tc[i].err { no := "no" if tc[i].err { @@ -206,6 +210,7 @@ func TestRun(t *testing.T) { From: &url.URL{}, FromQuery: &url.URL{}, Logger: log.NewNopLogger(), + Metrics: NewWorkerMetrics(prometheus.NewRegistry()), } w, err := New(c) if err != nil { @@ -235,7 +240,7 @@ func TestRun(t *testing.T) { if err != nil { stdlog.Fatalf("failed to parse second test server URL: %v", err) } - if err := w.Reconfigure(Config{From: from, FromQuery: from, Logger: log.NewNopLogger()}); err != nil { + if err := w.Reconfigure(Config{From: from, FromQuery: from, Logger: log.NewNopLogger(), Metrics: NewWorkerMetrics(prometheus.NewRegistry())}); err != nil { stdlog.Fatalf("failed to reconfigure worker with second test server url: %v", err) } }() @@ -248,7 +253,7 @@ func TestRun(t *testing.T) { } if err := w.Reconfigure(Config{From: from, FromQuery: from, RecordingRules: []string{"{\"name\":\"test\",\"query\":\"test\"}"}, - Logger: log.NewNopLogger()}); err != nil { + Logger: log.NewNopLogger(), Metrics: NewWorkerMetrics(prometheus.NewRegistry())}); err != nil { t.Fatalf("failed to reconfigure worker with first test server url: %v", err) } diff --git a/collectors/metrics/pkg/http/routes.go b/collectors/metrics/pkg/http/routes.go index 85f0cf0e73..cba2b3bdbf 100644 --- a/collectors/metrics/pkg/http/routes.go +++ b/collectors/metrics/pkg/http/routes.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/pprof" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -27,8 +28,8 @@ func HealthRoutes(mux *http.ServeMux) *http.ServeMux { } // MetricRoutes adds the metrics endpoint to a mux. -func MetricRoutes(mux *http.ServeMux) *http.ServeMux { - mux.Handle("/metrics", promhttp.Handler()) +func MetricRoutes(mux *http.ServeMux, reg *prometheus.Registry) *http.ServeMux { + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) return mux } diff --git a/collectors/metrics/pkg/metricsclient/metricsclient.go b/collectors/metrics/pkg/metricsclient/metricsclient.go index 0a594760d8..9b835ff552 100644 --- a/collectors/metrics/pkg/metricsclient/metricsclient.go +++ b/collectors/metrics/pkg/metricsclient/metricsclient.go @@ -24,7 +24,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" clientmodel "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/prometheus/model/labels" @@ -48,29 +47,26 @@ type Client struct { metricsName string logger log.Logger - gaugeRequestRetrieve *prometheus.GaugeVec - gaugeRequestSend *prometheus.GaugeVec + m *ClientMetrics +} + +type ClientMetrics struct { + GaugeRequestRetrieve *prometheus.GaugeVec + GaugeRequestSend *prometheus.GaugeVec } type PartitionedMetrics struct { Families []*clientmodel.MetricFamily } -func New(logger log.Logger, reg *prometheus.Registry, client *http.Client, maxBytes int64, timeout time.Duration, metricsName string) *Client { +func New(logger log.Logger, m *ClientMetrics, client *http.Client, maxBytes int64, timeout time.Duration, metricsName string) *Client { return &Client{ client: client, maxBytes: maxBytes, timeout: timeout, metricsName: metricsName, logger: log.With(logger, "component", "metricsclient"), - gaugeRequestRetrieve: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "metricsclient_request_retrieve", - Help: "Tracks the number of metrics retrievals", - }, []string{"client", "status_code"}), - gaugeRequestSend: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "metricsclient_request_send", - Help: "Tracks the number of metrics sends", - }, []string{"client", "status_code"}), + m: m, } } @@ -101,18 +97,18 @@ func (c *Client) RetrievRecordingMetrics( err := withCancel(ctx, c.client, req, func(resp *http.Response) error { switch resp.StatusCode { case http.StatusOK: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "200").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "200").Inc() case http.StatusUnauthorized: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "401").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "401").Inc() return errors.Newf("prometheus server requires authentication: %s", resp.Request.URL) case http.StatusForbidden: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "403").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "403").Inc() return errors.Newf("prometheus server forbidden: %s", resp.Request.URL) case http.StatusBadRequest: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "400").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "400").Inc() return errors.Newf("bad request: %s", resp.Request.URL) default: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, strconv.Itoa(resp.StatusCode)).Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, strconv.Itoa(resp.StatusCode)).Inc() return errors.Newf("prometheus server reported unexpected error code: %d", resp.StatusCode) } @@ -193,18 +189,18 @@ func (c *Client) Retrieve(ctx context.Context, req *http.Request) ([]*clientmode err := withCancel(ctx, c.client, req, func(resp *http.Response) error { switch resp.StatusCode { case http.StatusOK: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "200").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "200").Inc() case http.StatusUnauthorized: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "401").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "401").Inc() return errors.Newf("prometheus server requires authentication: %s", resp.Request.URL) case http.StatusForbidden: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "403").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "403").Inc() return errors.Newf("prometheus server forbidden: %s", resp.Request.URL) case http.StatusBadRequest: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, "400").Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, "400").Inc() return errors.Newf("bad request: %s", resp.Request.URL) default: - c.gaugeRequestRetrieve.WithLabelValues(c.metricsName, strconv.Itoa(resp.StatusCode)).Inc() + c.m.GaugeRequestRetrieve.WithLabelValues(c.metricsName, strconv.Itoa(resp.StatusCode)).Inc() return errors.Newf("prometheus server reported unexpected error code: %d", resp.StatusCode) } @@ -261,19 +257,19 @@ func (c *Client) Send(ctx context.Context, req *http.Request, families []*client logger.Log(c.logger, logger.Debug, "msg", resp.StatusCode) switch resp.StatusCode { case http.StatusOK: - c.gaugeRequestSend.WithLabelValues(c.metricsName, "200").Inc() + c.m.GaugeRequestSend.WithLabelValues(c.metricsName, "200").Inc() case http.StatusUnauthorized: - c.gaugeRequestSend.WithLabelValues(c.metricsName, "401").Inc() + c.m.GaugeRequestSend.WithLabelValues(c.metricsName, "401").Inc() return errors.Newf("gateway server requires authentication: %s", resp.Request.URL) case http.StatusForbidden: - c.gaugeRequestSend.WithLabelValues(c.metricsName, "403").Inc() + c.m.GaugeRequestSend.WithLabelValues(c.metricsName, "403").Inc() return errors.Newf("gateway server forbidden: %s", resp.Request.URL) case http.StatusBadRequest: - c.gaugeRequestSend.WithLabelValues(c.metricsName, "400").Inc() + c.m.GaugeRequestSend.WithLabelValues(c.metricsName, "400").Inc() logger.Log(c.logger, logger.Debug, "msg", resp.Body) return errors.Newf("gateway server bad request: %s", resp.Request.URL) default: - c.gaugeRequestSend.WithLabelValues(c.metricsName, strconv.Itoa(resp.StatusCode)).Inc() + c.m.GaugeRequestSend.WithLabelValues(c.metricsName, strconv.Itoa(resp.StatusCode)).Inc() body, _ := io.ReadAll(resp.Body) if len(body) > 1024 { body = body[:1024]