diff --git a/CHANGELOG.md b/CHANGELOG.md index 30ef5295d46f..d7460dcff6f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +## 💡 Enhancements 💡 + +- `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679) + ## 🛑 Breaking changes 🛑 - `memcachedreceiver`: Update metric names (#6594) diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 181477bff136..43dd4db7b0e0 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -25,6 +25,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -380,7 +381,20 @@ func Test_PushMetrics(t *testing.T) { emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary]) - checkFunc := func(t *testing.T, r *http.Request, expected int) { + // staleNaN cases + staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram]) + + staleNaNSummaryBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSummary]) + + staleNaNIntGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntGauge]) + + staleNaNDoubleGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNDoubleGauge]) + + staleNaNIntSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntSum]) + + staleNaNSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSum]) + + checkFunc := func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) { body, err := ioutil.ReadAll(r.Body) if err != nil { t.Fatal(err) @@ -397,15 +411,19 @@ func Test_PushMetrics(t *testing.T) { ok := proto.Unmarshal(dest, wr) require.Nil(t, ok) assert.EqualValues(t, expected, len(wr.Timeseries)) + if isStaleMarker { + assert.True(t, value.IsStaleNaN(wr.Timeseries[0].Samples[0].Value)) + } } tests := []struct { name string md *pdata.Metrics - reqTestFunc func(t *testing.T, r *http.Request, expected int) + reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) expectedTimeSeries int httpResponseCode int returnErr bool + isStaleMarker bool }{ { "invalid_type_case", @@ -414,6 +432,7 @@ func Test_PushMetrics(t *testing.T) { 0, http.StatusAccepted, true, + false, }, { "intSum_case", @@ -422,6 +441,7 @@ func Test_PushMetrics(t *testing.T) { 2, http.StatusAccepted, false, + false, }, { "doubleSum_case", @@ -430,6 +450,7 @@ func Test_PushMetrics(t *testing.T) { 2, http.StatusAccepted, false, + false, }, { "doubleGauge_case", @@ -438,6 +459,7 @@ func Test_PushMetrics(t *testing.T) { 2, http.StatusAccepted, false, + false, }, { "intGauge_case", @@ -446,6 +468,7 @@ func Test_PushMetrics(t *testing.T) { 2, http.StatusAccepted, false, + false, }, { "histogram_case", @@ -454,6 +477,7 @@ func Test_PushMetrics(t *testing.T) { 12, http.StatusAccepted, false, + false, }, { "summary_case", @@ -462,6 +486,7 @@ func Test_PushMetrics(t *testing.T) { 10, http.StatusAccepted, false, + false, }, { "unmatchedBoundBucketHist_case", @@ -470,6 +495,7 @@ func Test_PushMetrics(t *testing.T) { 5, http.StatusAccepted, false, + false, }, { "5xx_case", @@ -478,6 +504,7 @@ func Test_PushMetrics(t *testing.T) { 5, http.StatusServiceUnavailable, true, + false, }, { "emptyGauge_case", @@ -486,6 +513,7 @@ func Test_PushMetrics(t *testing.T) { 0, http.StatusAccepted, true, + false, }, { "emptyCumulativeSum_case", @@ -494,6 +522,7 @@ func Test_PushMetrics(t *testing.T) { 0, http.StatusAccepted, true, + false, }, { "emptyCumulativeHistogram_case", @@ -502,6 +531,7 @@ func Test_PushMetrics(t *testing.T) { 0, http.StatusAccepted, true, + false, }, { "emptySummary_case", @@ -510,6 +540,61 @@ func Test_PushMetrics(t *testing.T) { 0, http.StatusAccepted, true, + false, + }, + { + "staleNaNIntGauge_case", + &staleNaNIntGaugeBatch, + checkFunc, + 1, + http.StatusAccepted, + false, + true, + }, + { + "staleNaNDoubleGauge_case", + &staleNaNDoubleGaugeBatch, + checkFunc, + 1, + http.StatusAccepted, + false, + true, + }, + { + "staleNaNIntSum_case", + &staleNaNIntSumBatch, + checkFunc, + 1, + http.StatusAccepted, + false, + true, + }, + { + "staleNaNSum_case", + &staleNaNSumBatch, + checkFunc, + 1, + http.StatusAccepted, + false, + true, + }, + { + "staleNaNHistogram_case", + &staleNaNHistogramBatch, + checkFunc, + 6, + http.StatusAccepted, + false, + true, + }, + { + "staleNaNSummary_case", + &staleNaNSummaryBatch, + checkFunc, + 5, + http.StatusAccepted, + false, + true, }, } @@ -517,7 +602,7 @@ func Test_PushMetrics(t *testing.T) { t.Run(tt.name, func(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if tt.reqTestFunc != nil { - tt.reqTestFunc(t, r, tt.expectedTimeSeries) + tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker) } w.WriteHeader(tt.httpResponseCode) })) diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index 59166e2dd047..03e79d9f7bcf 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/model/pdata" ) @@ -329,6 +330,9 @@ func addSingleNumberDataPoint(pt pdata.NumberDataPoint, resource pdata.Resource, case pdata.MetricValueTypeDouble: sample.Value = pt.DoubleVal() } + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + sample.Value = math.Float64frombits(value.StaleNaN) + } addSample(tsMap, sample, labels, metric) } @@ -344,6 +348,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res Value: pt.Sum(), Timestamp: time, } + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + sum.Value = math.Float64frombits(value.StaleNaN) + } sumlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+sumStr) addSample(tsMap, sum, sumlabels, metric) @@ -353,6 +360,10 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res Value: float64(pt.Count()), Timestamp: time, } + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + count.Value = math.Float64frombits(value.StaleNaN) + } + countlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+countStr) addSample(tsMap, count, countlabels, metric) @@ -373,6 +384,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res Value: float64(cumulativeCount), Timestamp: time, } + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + bucket.Value = math.Float64frombits(value.StaleNaN) + } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) labels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, boundStr) sig := addSample(tsMap, bucket, labels, metric) @@ -385,6 +399,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res Value: float64(cumulativeCount), Timestamp: time, } + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + infBucket.Value = math.Float64frombits(value.StaleNaN) + } infLabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, pInfStr) sig := addSample(tsMap, infBucket, infLabels, metric) @@ -431,7 +448,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc Value: pt.Sum(), Timestamp: time, } - + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + sum.Value = math.Float64frombits(value.StaleNaN) + } sumlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+sumStr) addSample(tsMap, sum, sumlabels, metric) @@ -440,6 +459,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc Value: float64(pt.Count()), Timestamp: time, } + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + count.Value = math.Float64frombits(value.StaleNaN) + } countlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+countStr) addSample(tsMap, count, countlabels, metric) @@ -450,6 +472,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc Value: qt.Value(), Timestamp: time, } + if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + quantile.Value = math.Float64frombits(value.StaleNaN) + } percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) qtlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName, quantileStr, percentileStr) addSample(tsMap, quantile, qtlabels, metric) diff --git a/exporter/prometheusremotewriteexporter/testutil_test.go b/exporter/prometheusremotewriteexporter/testutil_test.go index 748acd23d33d..0c5c295be410 100644 --- a/exporter/prometheusremotewriteexporter/testutil_test.go +++ b/exporter/prometheusremotewriteexporter/testutil_test.go @@ -17,6 +17,7 @@ package prometheusremotewriteexporter import ( "fmt" "math" + "strings" "time" "github.com/prometheus/prometheus/prompb" @@ -157,6 +158,22 @@ var ( emptyCumulativeSum: getEmptyCumulativeSumMetric(emptyCumulativeSum), emptyCumulativeHistogram: getEmptyCumulativeHistogramMetric(emptyCumulativeHistogram), } + staleNaNIntGauge = "staleNaNIntGauge" + staleNaNDoubleGauge = "staleNaNDoubleGauge" + staleNaNIntSum = "staleNaNIntSum" + staleNaNSum = "staleNaNSum" + staleNaNHistogram = "staleNaNHistogram" + staleNaNSummary = "staleNaNSummary" + + // staleNaN metrics as input should have the staleness marker flag + staleNaNMetrics = map[string]pdata.Metric{ + staleNaNIntGauge: getIntGaugeMetric(staleNaNIntGauge, lbs1, intVal1, time1), + staleNaNDoubleGauge: getDoubleGaugeMetric(staleNaNDoubleGauge, lbs1, floatVal1, time1), + staleNaNIntSum: getIntSumMetric(staleNaNIntSum, lbs1, intVal1, time1), + staleNaNSum: getSumMetric(staleNaNSum, lbs1, floatVal1, time1), + staleNaNHistogram: getHistogramMetric(staleNaNHistogram, lbs1, time1, floatVal2, uint64(intVal2), bounds, buckets), + staleNaNSummary: getSummaryMetric(staleNaNSummary, lbs2, time2, floatVal2, uint64(intVal2), quantiles), + } ) // OTLP metrics @@ -277,6 +294,9 @@ func getIntGaugeMetric(name string, attributes pdata.AttributeMap, value int64, metric.SetName(name) metric.SetDataType(pdata.MetricDataTypeGauge) dp := metric.Gauge().DataPoints().AppendEmpty() + if strings.HasPrefix(name, "staleNaN") { + dp.SetFlags(1) + } dp.SetIntVal(value) attributes.CopyTo(dp.Attributes()) @@ -290,6 +310,9 @@ func getDoubleGaugeMetric(name string, attributes pdata.AttributeMap, value floa metric.SetName(name) metric.SetDataType(pdata.MetricDataTypeGauge) dp := metric.Gauge().DataPoints().AppendEmpty() + if strings.HasPrefix(name, "staleNaN") { + dp.SetFlags(1) + } dp.SetDoubleVal(value) attributes.CopyTo(dp.Attributes()) @@ -311,6 +334,9 @@ func getIntSumMetric(name string, attributes pdata.AttributeMap, value int64, ts metric.SetDataType(pdata.MetricDataTypeSum) metric.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) dp := metric.Sum().DataPoints().AppendEmpty() + if strings.HasPrefix(name, "staleNaN") { + dp.SetFlags(1) + } dp.SetIntVal(value) attributes.CopyTo(dp.Attributes()) @@ -333,6 +359,9 @@ func getSumMetric(name string, attributes pdata.AttributeMap, value float64, ts metric.SetDataType(pdata.MetricDataTypeSum) metric.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) dp := metric.Sum().DataPoints().AppendEmpty() + if strings.HasPrefix(name, "staleNaN") { + dp.SetFlags(1) + } dp.SetDoubleVal(value) attributes.CopyTo(dp.Attributes()) @@ -362,6 +391,9 @@ func getHistogramMetric(name string, attributes pdata.AttributeMap, ts uint64, s metric.SetDataType(pdata.MetricDataTypeHistogram) metric.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) dp := metric.Histogram().DataPoints().AppendEmpty() + if strings.HasPrefix(name, "staleNaN") { + dp.SetFlags(1) + } dp.SetCount(count) dp.SetSum(sum) dp.SetBucketCounts(buckets) @@ -384,9 +416,11 @@ func getSummaryMetric(name string, attributes pdata.AttributeMap, ts uint64, sum metric.SetName(name) metric.SetDataType(pdata.MetricDataTypeSummary) dp := metric.Summary().DataPoints().AppendEmpty() + if strings.HasPrefix(name, "staleNaN") { + dp.SetFlags(1) + } dp.SetCount(count) dp.SetSum(sum) - attributes.Range(func(k string, v pdata.AttributeValue) bool { dp.Attributes().Upsert(k, v) return true