Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] prometheusremotewrite exporter add o…
Browse files Browse the repository at this point in the history
…ption to send metadata (#27565)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

This PR adds an option to send metric Metadata to prometheus compatible
backend (disabled by default). This contains information such as metric
descrtiption, type, unit, and name.

**Link to tracking Issue:** <Issue number if applicable> #13849

**Testing:** <Describe what testing was performed and which tests were
added.>

Tested in our testing environment with locally built image.

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Antoine Toulme <[email protected]>
Co-authored-by: Anthony Mirabella <[email protected]>
  • Loading branch information
3 people authored Nov 22, 2023
1 parent d0ca48f commit a0e6491
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 8 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prometheus-remote-write-add-option-to-send-metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: prometheusremotewrite exporter add option to send metadata

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [ 13849 ]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The following settings can be optionally configured:
- *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.*
- `namespace`: prefix attached to each exported metric name.
- `add_metric_suffixes`: If set to false, type and unit suffixes will not be added to metrics. Default: true.
- `send_metadata`: If set to true, prometheus metadata will be generated and sent. Default: false.
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `enabled`: enable the sending queue (default: `true`)
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`)
Expand Down
3 changes: 3 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Config struct {

// AddMetricSuffixes controls whether unit and type suffixes are added to metrics on export
AddMetricSuffixes bool `mapstructure:"add_metric_suffixes"`

// SendMetadata controls whether prometheus metadata will be generated and sent
SendMetadata bool `mapstructure:"send_metadata"`
}

type CreatedMetric struct {
Expand Down
13 changes: 10 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
DisableTargetInfo: !cfg.TargetInfo.Enabled,
ExportCreatedMetric: cfg.CreatedMetric.Enabled,
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
},
}
if cfg.WAL == nil {
Expand Down Expand Up @@ -130,12 +131,18 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
err = consumererror.NewPermanent(err)
}

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}
// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.handleExport(ctx, tsMap))
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
}
}

Expand All @@ -151,14 +158,14 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
return sanitizedLabels, nil
}

func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
return nil
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
return err
}

return prwe.handleExport(context.Background(), testmap)
return prwe.handleExport(context.Background(), testmap, nil)
}

// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
Expand Down Expand Up @@ -919,7 +919,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
"timeseries1": ts1,
"timeseries2": ts2,
}
errs := prwe.handleExport(ctx, tsMap)
errs := prwe.handleExport(ctx, tsMap, nil)
assert.NoError(t, errs)
// Shutdown after we've written to the WAL. This ensures that our
// exported data in-flight will flushed flushed to the WAL before exiting.
Expand Down
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func createDefaultConfig() component.Config {
Multiplier: backoff.DefaultMultiplier,
},
AddMetricSuffixes: true,
SendMetadata: false,
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "http://some.url:9411/api/prom/push",
// We almost read 0 bytes, so no need to tune ReadBufferSize.
Expand Down
34 changes: 32 additions & 2 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) ([]*prompb.WriteRequest, error) {
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

requests := make([]*prompb.WriteRequest, 0, len(tsMap))
requests := make([]*prompb.WriteRequest, 0, len(tsMap)+len(m))
tsArray := make([]prompb.TimeSeries, 0, len(tsMap))
sizeOfCurrentBatch := 0

Expand All @@ -42,6 +42,30 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int)
requests = append(requests, wrapped)
}

mArray := make([]prompb.MetricMetadata, 0, len(m))
sizeOfCurrentBatch = 0
i = 0
for _, v := range m {
sizeOfM := v.Size()

if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
wrapped := convertMetadataToRequest(mArray)
requests = append(requests, wrapped)

mArray = make([]prompb.MetricMetadata, 0, len(m)-i)
sizeOfCurrentBatch = 0
}

mArray = append(mArray, *v)
sizeOfCurrentBatch += sizeOfM
i++
}

if len(mArray) != 0 {
wrapped := convertMetadataToRequest(mArray)
requests = append(requests, wrapped)
}

return requests, nil
}

Expand All @@ -57,6 +81,12 @@ func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteReques
}
}

func convertMetadataToRequest(m []prompb.MetricMetadata) *prompb.WriteRequest {
return &prompb.WriteRequest{
Metadata: m,
}
}

func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries {
for i := range tsArray {
sL := tsArray[i].Samples
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Test_batchTimeSeries(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize)
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil)
if tt.returnErr {
assert.Error(t, err)
return
Expand Down
1 change: 1 addition & 0 deletions pkg/translator/prometheusremotewrite/metrics_to_prw.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Settings struct {
DisableTargetInfo bool
ExportCreatedMetric bool
AddMetricSuffixes bool
SendMetadata bool
}

// FromMetrics converts pmetric.Metrics to prometheus remote write format.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pmetric"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) prompb.MetricMetadata_MetricType {
switch otelMetric.Type() {
case pmetric.MetricTypeGauge:
return prompb.MetricMetadata_GAUGE
case pmetric.MetricTypeSum:
metricType := prompb.MetricMetadata_GAUGE
if otelMetric.Sum().IsMonotonic() {
metricType = prompb.MetricMetadata_COUNTER
}
return metricType
case pmetric.MetricTypeHistogram:
return prompb.MetricMetadata_HISTOGRAM
case pmetric.MetricTypeSummary:
return prompb.MetricMetadata_SUMMARY
case pmetric.MetricTypeExponentialHistogram:
return prompb.MetricMetadata_HISTOGRAM
}
return prompb.MetricMetadata_UNKNOWN
}

func OtelMetricsToMetadata(md pmetric.Metrics, addMetricSuffixes bool) []*prompb.MetricMetadata {
resourceMetricsSlice := md.ResourceMetrics()

metadataLength := 0
for i := 0; i < resourceMetricsSlice.Len(); i++ {
scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics()
for j := 0; j < scopeMetricsSlice.Len(); j++ {
metadataLength += scopeMetricsSlice.At(j).Metrics().Len()
}
}

var metadata = make([]*prompb.MetricMetadata, 0, metadataLength)
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
scopeMetricsSlice := resourceMetrics.ScopeMetrics()

for j := 0; j < scopeMetricsSlice.Len(); j++ {
scopeMetrics := scopeMetricsSlice.At(j)
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)
entry := prompb.MetricMetadata{
Type: otelMetricTypeToPromMetricType(metric),
MetricFamilyName: prometheustranslator.BuildCompliantName(metric, "", addMetricSuffixes),
Help: metric.Description(),
}
metadata = append(metadata, &entry)
}
}
}

return metadata
}
Loading

0 comments on commit a0e6491

Please sign in to comment.