Skip to content

Commit

Permalink
prometheusremotewriteexporter: Add exemplars support (#5578)
Browse files Browse the repository at this point in the history
* add exemplars support to prometheusremotewriteexporter
* fix lint error
* fix according to review
* add comment in helper_test
* change the loop for the exemplars attributes with Range func and use the AsString conversion
* change the logic of addExemplars func to add the exemplar with the associated bucket bound

Signed-off-by: Anne-Elisabeth Lelièvre <[email protected]>
  • Loading branch information
anneelisabethlelievre authored Nov 23, 2021
1 parent 73cd3e7 commit c1931e1
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 21 deletions.
100 changes: 95 additions & 5 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package prometheusremotewriteexporter
import (
"errors"
"log"
"math"
"sort"
"strconv"
"strings"
"time"
"unicode"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/model/pdata"
)
Expand All @@ -39,6 +41,18 @@ const (
keyStr = "key"
)

type bucketBoundsData struct {
sig string
bound float64
}

// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds
type byBucketBoundsData []bucketBoundsData

func (m byBucketBoundsData) Len() int { return len(m) }
func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound }
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }

// ByLabelName enables the usage of sort.Sort() with a slice of labels
type ByLabelName []prompb.Label

Expand All @@ -63,12 +77,13 @@ func validateMetrics(metric pdata.Metric) bool {
}

// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it
// creates a new TimeSeries in the map if not found. tsMap is unmodified if either of its parameters is nil.
// creates a new TimeSeries in the map if not found and returns the time series signature.
// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil.
func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label,
metric pdata.Metric) {
metric pdata.Metric) string {

if sample == nil || labels == nil || tsMap == nil {
return
return ""
}

sig := timeSeriesSignature(metric, &labels)
Expand All @@ -83,6 +98,44 @@ func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, label
}
tsMap[sig] = newTs
}

return sig
}

// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig;
// we only add exemplars if samples are presents
// tsMap is unmodified if either of its parameters is nil and samples are nil.
func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) {
if tsMap == nil || bucketBoundsData == nil || exemplars == nil {
return
}

sort.Sort(byBucketBoundsData(bucketBoundsData))

for _, exemplar := range exemplars {
addExemplar(tsMap, bucketBoundsData, exemplar)
}
}

func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar prompb.Exemplar) {
for _, bucketBound := range bucketBounds {
sig := bucketBound.sig
bound := bucketBound.bound

_, ok := tsMap[sig]
if ok {
if tsMap[sig].Samples != nil {
if tsMap[sig].Exemplars == nil {
tsMap[sig].Exemplars = make([]prompb.Exemplar, 0)
}

if exemplar.Value <= bound {
tsMap[sig].Exemplars = append(tsMap[sig].Exemplars, exemplar)
return
}
}
}
}
}

// timeSeries return a string signature in the form of:
Expand Down Expand Up @@ -306,6 +359,10 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
// cumulative count for conversion to cumulative histogram
var cumulativeCount uint64

promExemplars := getPromExemplars(pt)

bucketBounds := make([]bucketBoundsData, 0)

// process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1
for index, bound := range pt.ExplicitBounds() {
if index >= len(pt.BucketCounts()) {
Expand All @@ -318,7 +375,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
}
boundStr := strconv.FormatFloat(bound, 'f', -1, 64)
labels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, boundStr)
addSample(tsMap, bucket, labels, metric)
sig := addSample(tsMap, bucket, labels, metric)

bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound})
}
// add le=+Inf bucket
cumulativeCount += pt.BucketCounts()[len(pt.BucketCounts())-1]
Expand All @@ -327,7 +386,38 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Timestamp: time,
}
infLabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, pInfStr)
addSample(tsMap, infBucket, infLabels, metric)
sig := addSample(tsMap, infBucket, infLabels, metric)

bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)})
addExemplars(tsMap, promExemplars, bucketBounds)
}

func getPromExemplars(pt pdata.HistogramDataPoint) []prompb.Exemplar {
var promExemplars []prompb.Exemplar

for i := 0; i < pt.Exemplars().Len(); i++ {
exemplar := pt.Exemplars().At(i)

promExemplar := &prompb.Exemplar{
Value: exemplar.DoubleVal(),
Timestamp: timestamp.FromTime(exemplar.Timestamp().AsTime()),
}

exemplar.FilteredAttributes().Range(func(key string, value pdata.AttributeValue) bool {
promLabel := prompb.Label{
Name: key,
Value: value.AsString(),
}

promExemplar.Labels = append(promExemplar.Labels, promLabel)

return true
})

promExemplars = append(promExemplars, *promExemplar)
}

return promExemplars
}

// addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples.
Expand Down
110 changes: 110 additions & 0 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package prometheusremotewriteexporter

import (
"math"
"testing"
"time"

"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
Expand Down Expand Up @@ -486,3 +489,110 @@ func TestEnsureTimeseriesPointsAreSortedByTimestamp(t *testing.T) {
}
}
}

// Test_addExemplars checks addExemplars updates the map it receives correctly based on the exemplars and bucket bounds data it receives.
func Test_addExemplars(t *testing.T) {
type testCase struct {
exemplars []prompb.Exemplar
bucketBounds []bucketBoundsData
}

tests := []struct {
name string
orig map[string]*prompb.TimeSeries
testCase []testCase
want map[string]*prompb.TimeSeries
}{
{
"timeSeries_is_empty",
map[string]*prompb.TimeSeries{},
[]testCase{
{
[]prompb.Exemplar{getExemplar(float64(intVal1), msTime1)},
getBucketBoundsData([]float64{1, 2, 3}),
},
},
map[string]*prompb.TimeSeries{},
},
{
"timeSeries_without_sample",
tsWithoutSampleAndExemplar,
[]testCase{
{
[]prompb.Exemplar{getExemplar(float64(intVal1), msTime1)},
getBucketBoundsData([]float64{1, 2, 3}),
},
},
tsWithoutSampleAndExemplar,
},
{
"exemplar_value_less_than_bucket_bound",
map[string]*prompb.TimeSeries{
lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12),
getSample(float64(intVal1), msTime1)),
},
[]testCase{
{
[]prompb.Exemplar{getExemplar(floatVal2, msTime1)},
getBucketBoundsData([]float64{1, 2, 3}),
},
},
tsWithSamplesAndExemplars,
},
{
"infinite_bucket_bound",
map[string]*prompb.TimeSeries{
lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12),
getSample(float64(intVal1), msTime1)),
},
[]testCase{
{
[]prompb.Exemplar{getExemplar(math.MaxFloat64, msTime1)},
getBucketBoundsData([]float64{1, math.Inf(1)}),
},
},
tsWithInfiniteBoundExemplarValue,
},
}
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addExemplars(tt.orig, tt.testCase[0].exemplars, tt.testCase[0].bucketBounds)
assert.Exactly(t, tt.want, tt.orig)
})
}
}

// Test_getPromExemplars checks if exemplars is not nul and return the prometheus exemplars.
func Test_getPromExemplars(t *testing.T) {
tnow := time.Now()
tests := []struct {
name string
histogram *pdata.HistogramDataPoint
expected []prompb.Exemplar
}{
{
"with_exemplars",
getHistogramDataPointWithExemplars(tnow, floatVal1, traceIDKey, traceIDValue1),
[]prompb.Exemplar{
{
Value: floatVal1,
Timestamp: timestamp.FromTime(tnow),
Labels: []prompb.Label{getLabel(traceIDKey, traceIDValue1)},
},
},
},
{
"without_exemplar",
getHistogramDataPoint(),
nil,
},
}
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requests := getPromExemplars(*tt.histogram)
assert.Exactly(t, tt.expected, requests)
})
}
}
Loading

0 comments on commit c1931e1

Please sign in to comment.