From a0e6491b4d4dd4ffe790361d18134282cdaf7d8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraj=20Mich=C3=A1lek?= Date: Wed, 22 Nov 2023 18:32:30 +0100 Subject: [PATCH] [exporter/prometheusremotewrite] prometheusremotewrite exporter add option to send metadata (#27565) **Description:** This PR adds an option to send metric Metadata to prometheus compatible backend (disabled by default). This contains information such as metric descrtiption, type, unit, and name. **Link to tracking Issue:** #13849 **Testing:** Tested in our testing environment with locally built image. **Documentation:** --------- Co-authored-by: Antoine Toulme Co-authored-by: Anthony Mirabella --- ...ote-write-add-option-to-send-metadata.yaml | 27 +++ .../prometheusremotewriteexporter/README.md | 1 + .../prometheusremotewriteexporter/config.go | 3 + .../prometheusremotewriteexporter/exporter.go | 13 +- .../exporter_test.go | 4 +- .../prometheusremotewriteexporter/factory.go | 1 + .../prometheusremotewriteexporter/helper.go | 34 ++- .../helper_test.go | 2 +- .../prometheusremotewrite/metrics_to_prw.go | 1 + .../otlp_to_openmetrics_metadata.go | 64 +++++ .../otlp_to_openmetrics_metadata_test.go | 226 ++++++++++++++++++ 11 files changed, 368 insertions(+), 8 deletions(-) create mode 100755 .chloggen/prometheus-remote-write-add-option-to-send-metadata.yaml create mode 100644 pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go create mode 100644 pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata_test.go diff --git a/.chloggen/prometheus-remote-write-add-option-to-send-metadata.yaml b/.chloggen/prometheus-remote-write-add-option-to-send-metadata.yaml new file mode 100755 index 000000000000..45199deb9018 --- /dev/null +++ b/.chloggen/prometheus-remote-write-add-option-to-send-metadata.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: prometheusremotewrite exporter add option to send metadata + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [ 13849 ] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index f15b3c4b76e8..d230c2f9765c 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -52,6 +52,7 @@ The following settings can be optionally configured: - *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.* - `namespace`: prefix attached to each exported metric name. - `add_metric_suffixes`: If set to false, type and unit suffixes will not be added to metrics. Default: true. +- `send_metadata`: If set to true, prometheus metadata will be generated and sent. Default: false. - `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes. - `enabled`: enable the sending queue (default: `true`) - `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`) diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go index 22b0b3a969d9..1f854918d2d5 100644 --- a/exporter/prometheusremotewriteexporter/config.go +++ b/exporter/prometheusremotewriteexporter/config.go @@ -48,6 +48,9 @@ type Config struct { // AddMetricSuffixes controls whether unit and type suffixes are added to metrics on export AddMetricSuffixes bool `mapstructure:"add_metric_suffixes"` + + // SendMetadata controls whether prometheus metadata will be generated and sent + SendMetadata bool `mapstructure:"send_metadata"` } type CreatedMetric struct { diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 865abcc06636..cc7a65f294b1 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -77,6 +77,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err DisableTargetInfo: !cfg.TargetInfo.Enabled, ExportCreatedMetric: cfg.CreatedMetric.Enabled, AddMetricSuffixes: cfg.AddMetricSuffixes, + SendMetadata: cfg.SendMetadata, }, } if cfg.WAL == nil { @@ -130,12 +131,18 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er case <-prwe.closeChan: return errors.New("shutdown has been called") default: + tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { err = consumererror.NewPermanent(err) } + + var m []*prompb.MetricMetadata + if prwe.exporterSettings.SendMetadata { + m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes) + } // Call export even if a conversion error, since there may be points that were successfully converted. - return multierr.Combine(err, prwe.handleExport(ctx, tsMap)) + return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m)) } } @@ -151,14 +158,14 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) { return sanitizedLabels, nil } -func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { +func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error { // There are no metrics to export, so return. if len(tsMap) == 0 { return nil } // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes) + requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m) if err != nil { return err } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 2a317cebb91d..9bcfcac1a2a0 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -368,7 +368,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error { return err } - return prwe.handleExport(context.Background(), testmap) + return prwe.handleExport(context.Background(), testmap, nil) } // Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as @@ -919,7 +919,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) { "timeseries1": ts1, "timeseries2": ts2, } - errs := prwe.handleExport(ctx, tsMap) + errs := prwe.handleExport(ctx, tsMap, nil) assert.NoError(t, errs) // Shutdown after we've written to the WAL. This ensures that our // exported data in-flight will flushed flushed to the WAL before exiting. diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 63135b9b8cb5..f90aa41b21ac 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -81,6 +81,7 @@ func createDefaultConfig() component.Config { Multiplier: backoff.DefaultMultiplier, }, AddMetricSuffixes: true, + SendMetadata: false, HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "http://some.url:9411/api/prom/push", // We almost read 0 bytes, so no need to tune ReadBufferSize. diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index b468609380c8..d5eca3086a7e 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -11,12 +11,12 @@ import ( ) // batchTimeSeries splits series into multiple batch write requests. -func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) ([]*prompb.WriteRequest, error) { +func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) { if len(tsMap) == 0 { return nil, errors.New("invalid tsMap: cannot be empty map") } - requests := make([]*prompb.WriteRequest, 0, len(tsMap)) + requests := make([]*prompb.WriteRequest, 0, len(tsMap)+len(m)) tsArray := make([]prompb.TimeSeries, 0, len(tsMap)) sizeOfCurrentBatch := 0 @@ -42,6 +42,30 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) requests = append(requests, wrapped) } + mArray := make([]prompb.MetricMetadata, 0, len(m)) + sizeOfCurrentBatch = 0 + i = 0 + for _, v := range m { + sizeOfM := v.Size() + + if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize { + wrapped := convertMetadataToRequest(mArray) + requests = append(requests, wrapped) + + mArray = make([]prompb.MetricMetadata, 0, len(m)-i) + sizeOfCurrentBatch = 0 + } + + mArray = append(mArray, *v) + sizeOfCurrentBatch += sizeOfM + i++ + } + + if len(mArray) != 0 { + wrapped := convertMetadataToRequest(mArray) + requests = append(requests, wrapped) + } + return requests, nil } @@ -57,6 +81,12 @@ func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteReques } } +func convertMetadataToRequest(m []prompb.MetricMetadata) *prompb.WriteRequest { + return &prompb.WriteRequest{ + Metadata: m, + } +} + func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries { for i := range tsArray { sL := tsArray[i].Samples diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go index 9528607a6fd9..8404bb6c9e9e 100644 --- a/exporter/prometheusremotewriteexporter/helper_test.go +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -57,7 +57,7 @@ func Test_batchTimeSeries(t *testing.T) { // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize) + requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil) if tt.returnErr { assert.Error(t, err) return diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw.go b/pkg/translator/prometheusremotewrite/metrics_to_prw.go index 2889a4e9640a..c189f299d037 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw.go @@ -21,6 +21,7 @@ type Settings struct { DisableTargetInfo bool ExportCreatedMetric bool AddMetricSuffixes bool + SendMetadata bool } // FromMetrics converts pmetric.Metrics to prometheus remote write format. diff --git a/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go b/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go new file mode 100644 index 000000000000..b2564a3d187f --- /dev/null +++ b/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" + +import ( + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pmetric" + + prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +) + +func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) prompb.MetricMetadata_MetricType { + switch otelMetric.Type() { + case pmetric.MetricTypeGauge: + return prompb.MetricMetadata_GAUGE + case pmetric.MetricTypeSum: + metricType := prompb.MetricMetadata_GAUGE + if otelMetric.Sum().IsMonotonic() { + metricType = prompb.MetricMetadata_COUNTER + } + return metricType + case pmetric.MetricTypeHistogram: + return prompb.MetricMetadata_HISTOGRAM + case pmetric.MetricTypeSummary: + return prompb.MetricMetadata_SUMMARY + case pmetric.MetricTypeExponentialHistogram: + return prompb.MetricMetadata_HISTOGRAM + } + return prompb.MetricMetadata_UNKNOWN +} + +func OtelMetricsToMetadata(md pmetric.Metrics, addMetricSuffixes bool) []*prompb.MetricMetadata { + resourceMetricsSlice := md.ResourceMetrics() + + metadataLength := 0 + for i := 0; i < resourceMetricsSlice.Len(); i++ { + scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics() + for j := 0; j < scopeMetricsSlice.Len(); j++ { + metadataLength += scopeMetricsSlice.At(j).Metrics().Len() + } + } + + var metadata = make([]*prompb.MetricMetadata, 0, metadataLength) + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + + for j := 0; j < scopeMetricsSlice.Len(); j++ { + scopeMetrics := scopeMetricsSlice.At(j) + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + metric := scopeMetrics.Metrics().At(k) + entry := prompb.MetricMetadata{ + Type: otelMetricTypeToPromMetricType(metric), + MetricFamilyName: prometheustranslator.BuildCompliantName(metric, "", addMetricSuffixes), + Help: metric.Description(), + } + metadata = append(metadata, &entry) + } + } + } + + return metadata +} diff --git a/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata_test.go b/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata_test.go new file mode 100644 index 000000000000..6807ed7319d1 --- /dev/null +++ b/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata_test.go @@ -0,0 +1,226 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" + prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +) + +func TestOtelMetricTypeToPromMetricType(t *testing.T) { + ts := uint64(time.Now().UnixNano()) + tests := []struct { + name string + metric func() pmetric.Metric + want prompb.MetricMetadata_MetricType + }{ + { + name: "gauge", + metric: func() pmetric.Metric { + return getIntGaugeMetric( + "test", + pcommon.NewMap(), + 1, ts, + ) + }, + want: prompb.MetricMetadata_GAUGE, + }, + { + name: "sum", + metric: func() pmetric.Metric { + return getIntSumMetric( + "test", + pcommon.NewMap(), + pmetric.AggregationTemporalityCumulative, + 1, ts, + ) + }, + want: prompb.MetricMetadata_GAUGE, + }, + { + name: "monotonic cumulative", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_sum") + metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + metric.SetEmptySum().SetIsMonotonic(true) + + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetDoubleValue(1) + + return metric + }, + want: prompb.MetricMetadata_COUNTER, + }, + { + name: "cumulative histogram", + metric: func() pmetric.Metric { + m := getHistogramMetric("", pcommon.NewMap(), pmetric.AggregationTemporalityCumulative, 0, 0, 0, []float64{}, []uint64{}) + return m + }, + want: prompb.MetricMetadata_HISTOGRAM, + }, + { + name: "cumulative exponential histogram", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + h := metric.SetEmptyExponentialHistogram() + h.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + return metric + }, + want: prompb.MetricMetadata_HISTOGRAM, + }, + { + name: "summary with start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_summary") + metric.SetEmptySummary() + + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(ts)) + dp.SetStartTimestamp(pcommon.Timestamp(ts)) + + return metric + }, + want: prompb.MetricMetadata_SUMMARY, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := tt.metric() + + metricType := otelMetricTypeToPromMetricType(metric) + + assert.Equal(t, tt.want, metricType) + }) + } +} + +func TestOtelMetricsToMetadata(t *testing.T) { + ts := uint64(time.Now().UnixNano()) + tests := []struct { + name string + metrics pmetric.Metrics + want []*prompb.MetricMetadata + }{ + { + name: "all types§", + metrics: GenerateMetricsAllTypesNoDataPointsHelp(), + want: []*prompb.MetricMetadata{ + { + Type: prompb.MetricMetadata_GAUGE, + MetricFamilyName: prometheustranslator.BuildCompliantName(getIntGaugeMetric( + testdata.TestGaugeDoubleMetricName, + pcommon.NewMap(), + 1, ts, + ), "", false), + Help: "gauge description", + }, + { + Type: prompb.MetricMetadata_GAUGE, + MetricFamilyName: prometheustranslator.BuildCompliantName(getIntGaugeMetric( + testdata.TestGaugeIntMetricName, + pcommon.NewMap(), + 1, ts, + ), "", false), + Help: "gauge description", + }, + { + Type: prompb.MetricMetadata_COUNTER, + MetricFamilyName: prometheustranslator.BuildCompliantName(getIntGaugeMetric( + testdata.TestSumDoubleMetricName, + pcommon.NewMap(), + 1, ts, + ), "", false), + Help: "sum description", + }, + { + Type: prompb.MetricMetadata_COUNTER, + MetricFamilyName: prometheustranslator.BuildCompliantName(getIntGaugeMetric( + testdata.TestSumIntMetricName, + pcommon.NewMap(), + 1, ts, + ), "", false), + Help: "sum description", + }, + { + Type: prompb.MetricMetadata_HISTOGRAM, + MetricFamilyName: prometheustranslator.BuildCompliantName(getIntGaugeMetric( + testdata.TestDoubleHistogramMetricName, + pcommon.NewMap(), + 1, ts, + ), "", false), + Help: "histogram description", + }, + { + Type: prompb.MetricMetadata_SUMMARY, + MetricFamilyName: prometheustranslator.BuildCompliantName(getIntGaugeMetric( + testdata.TestDoubleSummaryMetricName, + pcommon.NewMap(), + 1, ts, + ), "", false), + Help: "summary description", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metaData := OtelMetricsToMetadata(tt.metrics, false) + + for i := 0; i < len(metaData); i++ { + assert.Equal(t, tt.want[i].Type, metaData[i].Type) + assert.Equal(t, tt.want[i].MetricFamilyName, metaData[i].MetricFamilyName) + assert.Equal(t, tt.want[i].Help, metaData[i].Help) + } + + }) + } +} + +func GenerateMetricsAllTypesNoDataPointsHelp() pmetric.Metrics { + md := testdata.GenerateMetricsOneEmptyInstrumentationLibrary() + ilm0 := md.ResourceMetrics().At(0).ScopeMetrics().At(0) + ms := ilm0.Metrics() + initMetric(ms.AppendEmpty(), testdata.TestGaugeDoubleMetricName, pmetric.MetricTypeGauge, "gauge description") + initMetric(ms.AppendEmpty(), testdata.TestGaugeIntMetricName, pmetric.MetricTypeGauge, "gauge description") + initMetric(ms.AppendEmpty(), testdata.TestSumDoubleMetricName, pmetric.MetricTypeSum, "sum description") + initMetric(ms.AppendEmpty(), testdata.TestSumIntMetricName, pmetric.MetricTypeSum, "sum description") + initMetric(ms.AppendEmpty(), testdata.TestDoubleHistogramMetricName, pmetric.MetricTypeHistogram, "histogram description") + initMetric(ms.AppendEmpty(), testdata.TestDoubleSummaryMetricName, pmetric.MetricTypeSummary, "summary description") + return md +} + +func initMetric(m pmetric.Metric, name string, ty pmetric.MetricType, desc string) { + m.SetName(name) + m.SetDescription(desc) + //exhaustive:enforce + switch ty { + case pmetric.MetricTypeGauge: + m.SetEmptyGauge() + + case pmetric.MetricTypeSum: + sum := m.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + case pmetric.MetricTypeHistogram: + histo := m.SetEmptyHistogram() + histo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + case pmetric.MetricTypeExponentialHistogram: + m.SetEmptyExponentialHistogram() + + case pmetric.MetricTypeSummary: + m.SetEmptySummary() + } +}