From 36b389bc2c47603a556d26dbfee052bdf21cc60b Mon Sep 17 00:00:00 2001 From: Juraj Michalek Date: Tue, 20 Jun 2023 22:47:21 +0200 Subject: [PATCH] feat: intial draft sending medatada in prometheus remote write --- .../prometheusremotewriteexporter/exporter.go | 37 +++++++++++++++++-- .../prometheusremotewriteexporter/helper.go | 16 +++++--- .../otlp_to_openmetrics_metadata.go | 33 +++++++++++++++++ 3 files changed, 78 insertions(+), 8 deletions(-) create mode 100644 pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 102110994296..ddf4b2378b1a 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -115,6 +115,34 @@ func (prwe *prwExporter) Shutdown(context.Context) error { return err } +func (prwe *prwExporter) createMetadata(md pmetric.Metrics) []prompb.MetricMetadata { + + var metadata []prompb.MetricMetadata + + resourceMetricsSlice := md.ResourceMetrics() + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + + for j := 0; j < scopeMetricsSlice.Len(); j++ { + scopeMetrics := scopeMetricsSlice.At(j) + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + + entry := prompb.MetricMetadata{ + Type: prompb.MetricMetadata_MetricType(scopeMetrics.Metrics().At(k).Type()), + MetricFamilyName: scopeMetrics.Metrics().At(k).Name(), + Help: scopeMetrics.Metrics().At(k).Description(), + Unit: scopeMetrics.Metrics().At(k).Unit(), + } + metadata = append(metadata, entry) + } + } + } + + return metadata + +} + // PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of // TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally // exports the map. @@ -126,12 +154,15 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er case <-prwe.closeChan: return errors.New("shutdown has been called") default: + + m := prometheusremotewrite.CreateMetadata(md) + tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { err = consumererror.NewPermanent(err) } // Call export even if a conversion error, since there may be points that were successfully converted. - return multierr.Combine(err, prwe.handleExport(ctx, tsMap)) + return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m)) } } @@ -147,14 +178,14 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) { return sanitizedLabels, nil } -func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { +func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []prompb.MetricMetadata) error { // There are no metrics to export, so return. if len(tsMap) == 0 { return nil } // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, maxBatchByteSize) + requests, err := batchTimeSeries(tsMap, m, maxBatchByteSize) if err != nil { return err } diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index 2f84d5617191..49fa455591c1 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -11,7 +11,7 @@ import ( ) // batchTimeSeries splits series into multiple batch write requests. -func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) ([]*prompb.WriteRequest, error) { +func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, m []prompb.MetricMetadata, maxBatchByteSize int) ([]*prompb.WriteRequest, error) { if len(tsMap) == 0 { return nil, errors.New("invalid tsMap: cannot be empty map") } @@ -20,11 +20,16 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) var tsArray []prompb.TimeSeries sizeOfCurrentBatch := 0 + sizeOfM := 0 + for _, mi := range m { + sizeOfM += mi.Size() + } + for _, v := range tsMap { sizeOfSeries := v.Size() - if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize { - wrapped := convertTimeseriesToRequest(tsArray) + if sizeOfCurrentBatch+sizeOfM+sizeOfSeries >= maxBatchByteSize { + wrapped := convertTimeseriesToRequest(tsArray, m) requests = append(requests, wrapped) tsArray = nil @@ -36,14 +41,14 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) } if len(tsArray) != 0 { - wrapped := convertTimeseriesToRequest(tsArray) + wrapped := convertTimeseriesToRequest(tsArray, m) requests = append(requests, wrapped) } return requests, nil } -func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteRequest { +func convertTimeseriesToRequest(tsArray []prompb.TimeSeries, m []prompb.MetricMetadata) *prompb.WriteRequest { // the remote_write endpoint only requires the timeseries. // otlp defines it's own way to handle metric metadata return &prompb.WriteRequest{ @@ -52,6 +57,7 @@ func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteReques // * https://github.com/open-telemetry/wg-prometheus/issues/10 // * https://github.com/open-telemetry/opentelemetry-collector/issues/2315 Timeseries: orderBySampleTimestamp(tsArray), + Metadata: m, } } diff --git a/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go b/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go new file mode 100644 index 000000000000..97690678019a --- /dev/null +++ b/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go @@ -0,0 +1,33 @@ +package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" + +import ( + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// CreateMetadata converts pmetric.Metrics to prometheus remote write format. +func CreateMetadata(md pmetric.Metrics) []prompb.MetricMetadata { + var metadata []prompb.MetricMetadata + + resourceMetricsSlice := md.ResourceMetrics() + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + + for j := 0; j < scopeMetricsSlice.Len(); j++ { + scopeMetrics := scopeMetricsSlice.At(j) + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + + entry := prompb.MetricMetadata{ + Type: prompb.MetricMetadata_MetricType(scopeMetrics.Metrics().At(k).Type()), + MetricFamilyName: scopeMetrics.Metrics().At(k).Name(), + Help: scopeMetrics.Metrics().At(k).Description(), + Unit: scopeMetrics.Metrics().At(k).Unit(), + } + metadata = append(metadata, entry) + } + } + } + + return metadata +}