From c1931e1ad28ff4c0aa7c3508440783a1af5db049 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anne-Elisabeth=20Leli=C3=A8vre?= Date: Tue, 23 Nov 2021 16:08:15 -0500 Subject: [PATCH] `prometheusremotewriteexporter`: Add exemplars support (#5578) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add exemplars support to prometheusremotewriteexporter * fix lint error * fix according to review * add comment in helper_test * change the loop for the exemplars attributes with Range func and use the AsString conversion * change the logic of addExemplars func to add the exemplar with the associated bucket bound Signed-off-by: Anne-Elisabeth Lelièvre --- .../prometheusremotewriteexporter/helper.go | 100 +++++++++++++++- .../helper_test.go | 110 ++++++++++++++++++ .../testutil_test.go | 92 ++++++++++++--- 3 files changed, 281 insertions(+), 21 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index ee5437d6400d..000f4624ac9b 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -17,6 +17,7 @@ package prometheusremotewriteexporter import ( "errors" "log" + "math" "sort" "strconv" "strings" @@ -24,6 +25,7 @@ import ( "unicode" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/model/pdata" ) @@ -39,6 +41,18 @@ const ( keyStr = "key" ) +type bucketBoundsData struct { + sig string + bound float64 +} + +// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds +type byBucketBoundsData []bucketBoundsData + +func (m byBucketBoundsData) Len() int { return len(m) } +func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } +func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } + // ByLabelName enables the usage of sort.Sort() with a slice of labels type ByLabelName []prompb.Label @@ -63,12 +77,13 @@ func validateMetrics(metric pdata.Metric) bool { } // addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it -// creates a new TimeSeries in the map if not found. tsMap is unmodified if either of its parameters is nil. +// creates a new TimeSeries in the map if not found and returns the time series signature. +// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label, - metric pdata.Metric) { + metric pdata.Metric) string { if sample == nil || labels == nil || tsMap == nil { - return + return "" } sig := timeSeriesSignature(metric, &labels) @@ -83,6 +98,44 @@ func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, label } tsMap[sig] = newTs } + + return sig +} + +// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig; +// we only add exemplars if samples are presents +// tsMap is unmodified if either of its parameters is nil and samples are nil. +func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) { + if tsMap == nil || bucketBoundsData == nil || exemplars == nil { + return + } + + sort.Sort(byBucketBoundsData(bucketBoundsData)) + + for _, exemplar := range exemplars { + addExemplar(tsMap, bucketBoundsData, exemplar) + } +} + +func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar prompb.Exemplar) { + for _, bucketBound := range bucketBounds { + sig := bucketBound.sig + bound := bucketBound.bound + + _, ok := tsMap[sig] + if ok { + if tsMap[sig].Samples != nil { + if tsMap[sig].Exemplars == nil { + tsMap[sig].Exemplars = make([]prompb.Exemplar, 0) + } + + if exemplar.Value <= bound { + tsMap[sig].Exemplars = append(tsMap[sig].Exemplars, exemplar) + return + } + } + } + } } // timeSeries return a string signature in the form of: @@ -306,6 +359,10 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res // cumulative count for conversion to cumulative histogram var cumulativeCount uint64 + promExemplars := getPromExemplars(pt) + + bucketBounds := make([]bucketBoundsData, 0) + // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 for index, bound := range pt.ExplicitBounds() { if index >= len(pt.BucketCounts()) { @@ -318,7 +375,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) labels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, boundStr) - addSample(tsMap, bucket, labels, metric) + sig := addSample(tsMap, bucket, labels, metric) + + bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound}) } // add le=+Inf bucket cumulativeCount += pt.BucketCounts()[len(pt.BucketCounts())-1] @@ -327,7 +386,38 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res Timestamp: time, } infLabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, pInfStr) - addSample(tsMap, infBucket, infLabels, metric) + sig := addSample(tsMap, infBucket, infLabels, metric) + + bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)}) + addExemplars(tsMap, promExemplars, bucketBounds) +} + +func getPromExemplars(pt pdata.HistogramDataPoint) []prompb.Exemplar { + var promExemplars []prompb.Exemplar + + for i := 0; i < pt.Exemplars().Len(); i++ { + exemplar := pt.Exemplars().At(i) + + promExemplar := &prompb.Exemplar{ + Value: exemplar.DoubleVal(), + Timestamp: timestamp.FromTime(exemplar.Timestamp().AsTime()), + } + + exemplar.FilteredAttributes().Range(func(key string, value pdata.AttributeValue) bool { + promLabel := prompb.Label{ + Name: key, + Value: value.AsString(), + } + + promExemplar.Labels = append(promExemplar.Labels, promLabel) + + return true + }) + + promExemplars = append(promExemplars, *promExemplar) + } + + return promExemplars } // addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go index abbe5f2ae170..a05293f8bd98 100644 --- a/exporter/prometheusremotewriteexporter/helper_test.go +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -15,8 +15,11 @@ package prometheusremotewriteexporter import ( + "math" "testing" + "time" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/model/pdata" @@ -486,3 +489,110 @@ func TestEnsureTimeseriesPointsAreSortedByTimestamp(t *testing.T) { } } } + +// Test_addExemplars checks addExemplars updates the map it receives correctly based on the exemplars and bucket bounds data it receives. +func Test_addExemplars(t *testing.T) { + type testCase struct { + exemplars []prompb.Exemplar + bucketBounds []bucketBoundsData + } + + tests := []struct { + name string + orig map[string]*prompb.TimeSeries + testCase []testCase + want map[string]*prompb.TimeSeries + }{ + { + "timeSeries_is_empty", + map[string]*prompb.TimeSeries{}, + []testCase{ + { + []prompb.Exemplar{getExemplar(float64(intVal1), msTime1)}, + getBucketBoundsData([]float64{1, 2, 3}), + }, + }, + map[string]*prompb.TimeSeries{}, + }, + { + "timeSeries_without_sample", + tsWithoutSampleAndExemplar, + []testCase{ + { + []prompb.Exemplar{getExemplar(float64(intVal1), msTime1)}, + getBucketBoundsData([]float64{1, 2, 3}), + }, + }, + tsWithoutSampleAndExemplar, + }, + { + "exemplar_value_less_than_bucket_bound", + map[string]*prompb.TimeSeries{ + lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), msTime1)), + }, + []testCase{ + { + []prompb.Exemplar{getExemplar(floatVal2, msTime1)}, + getBucketBoundsData([]float64{1, 2, 3}), + }, + }, + tsWithSamplesAndExemplars, + }, + { + "infinite_bucket_bound", + map[string]*prompb.TimeSeries{ + lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), msTime1)), + }, + []testCase{ + { + []prompb.Exemplar{getExemplar(math.MaxFloat64, msTime1)}, + getBucketBoundsData([]float64{1, math.Inf(1)}), + }, + }, + tsWithInfiniteBoundExemplarValue, + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + addExemplars(tt.orig, tt.testCase[0].exemplars, tt.testCase[0].bucketBounds) + assert.Exactly(t, tt.want, tt.orig) + }) + } +} + +// Test_getPromExemplars checks if exemplars is not nul and return the prometheus exemplars. +func Test_getPromExemplars(t *testing.T) { + tnow := time.Now() + tests := []struct { + name string + histogram *pdata.HistogramDataPoint + expected []prompb.Exemplar + }{ + { + "with_exemplars", + getHistogramDataPointWithExemplars(tnow, floatVal1, traceIDKey, traceIDValue1), + []prompb.Exemplar{ + { + Value: floatVal1, + Timestamp: timestamp.FromTime(tnow), + Labels: []prompb.Label{getLabel(traceIDKey, traceIDValue1)}, + }, + }, + }, + { + "without_exemplar", + getHistogramDataPoint(), + nil, + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + requests := getPromExemplars(*tt.histogram) + assert.Exactly(t, tt.expected, requests) + }) + } +} diff --git a/exporter/prometheusremotewriteexporter/testutil_test.go b/exporter/prometheusremotewriteexporter/testutil_test.go index 320d90a6cef7..748acd23d33d 100644 --- a/exporter/prometheusremotewriteexporter/testutil_test.go +++ b/exporter/prometheusremotewriteexporter/testutil_test.go @@ -16,6 +16,7 @@ package prometheusremotewriteexporter import ( "fmt" + "math" "time" "github.com/prometheus/prometheus/prompb" @@ -30,22 +31,24 @@ var ( msTime2 = int64(time2 / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) msTime3 = int64(time3 / uint64(int64(time.Millisecond)/int64(time.Nanosecond))) - label11 = "test_label11" - value11 = "test_value11" - label12 = "test_label12" - value12 = "test_value12" - label21 = "test_label21" - value21 = "test_value21" - label22 = "test_label22" - value22 = "test_value22" - label31 = "test_label31" - value31 = "test_value31" - label32 = "test_label32" - value32 = "test_value32" - label41 = "__test_label41__" - value41 = "test_value41" - dirty1 = "%" - dirty2 = "?" + label11 = "test_label11" + value11 = "test_value11" + label12 = "test_label12" + value12 = "test_value12" + label21 = "test_label21" + value21 = "test_value21" + label22 = "test_label22" + value22 = "test_value22" + label31 = "test_label31" + value31 = "test_value31" + label32 = "test_label32" + value32 = "test_value32" + label41 = "__test_label41__" + value41 = "test_value41" + dirty1 = "%" + dirty2 = "?" + traceIDValue1 = "traceID-value1" + traceIDKey = "trace_id" intVal1 int64 = 1 intVal2 int64 = 2 @@ -78,6 +81,20 @@ var ( "Gauge" + "-" + label21 + "-" + value21 + "-" + label22 + "-" + value22: getTimeSeries(getPromLabels(label21, value21, label22, value22), getSample(float64(intVal1), msTime2)), } + tsWithSamplesAndExemplars = map[string]*prompb.TimeSeries{ + lb1Sig: getTimeSeriesWithSamplesAndExemplars(getPromLabels(label11, value11, label12, value12), + []prompb.Sample{getSample(float64(intVal1), msTime1)}, + []prompb.Exemplar{getExemplar(floatVal2, msTime1)}), + } + tsWithInfiniteBoundExemplarValue = map[string]*prompb.TimeSeries{ + lb1Sig: getTimeSeriesWithSamplesAndExemplars(getPromLabels(label11, value11, label12, value12), + []prompb.Sample{getSample(float64(intVal1), msTime1)}, + []prompb.Exemplar{getExemplar(math.MaxFloat64, msTime1)}), + } + tsWithoutSampleAndExemplar = map[string]*prompb.TimeSeries{ + lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12), + nil...), + } bounds = []float64{0.1, 0.5, 0.99} buckets = []uint64{1, 2, 3} @@ -184,6 +201,49 @@ func getTimeSeries(labels []prompb.Label, samples ...prompb.Sample) *prompb.Time } } +func getExemplar(v float64, t int64) prompb.Exemplar { + return prompb.Exemplar{ + Value: v, + Timestamp: t, + Labels: []prompb.Label{getLabel(traceIDKey, traceIDValue1)}, + } +} + +func getBucketBoundsData(values []float64) []bucketBoundsData { + var b []bucketBoundsData + + for _, value := range values { + b = append(b, bucketBoundsData{sig: lb1Sig, bound: value}) + } + + return b +} + +func getTimeSeriesWithSamplesAndExemplars(labels []prompb.Label, samples []prompb.Sample, exemplars []prompb.Exemplar) *prompb.TimeSeries { + return &prompb.TimeSeries{ + Labels: labels, + Samples: samples, + Exemplars: exemplars, + } +} + +func getHistogramDataPointWithExemplars(time time.Time, value float64, attributeKey string, attributeValue string) *pdata.HistogramDataPoint { + h := pdata.NewHistogramDataPoint() + + e := h.Exemplars().AppendEmpty() + e.SetDoubleVal(value) + e.SetTimestamp(pdata.NewTimestampFromTime(time)) + e.FilteredAttributes().Insert(attributeKey, pdata.NewAttributeValueString(attributeValue)) + + return &h +} + +func getHistogramDataPoint() *pdata.HistogramDataPoint { + h := pdata.NewHistogramDataPoint() + + return &h +} + func getQuantiles(bounds []float64, values []float64) pdata.ValueAtQuantileSlice { quantiles := pdata.NewValueAtQuantileSlice() quantiles.EnsureCapacity(len(bounds))