Skip to content

Commit

Permalink
[Prometheus Remote Write Exporter] Handling Staleness flag from OTLP. (
Browse files Browse the repository at this point in the history
…#6679)

* implement to handle staleness marker flag and added test cases for gauge and sum metric data types

* feat: handling staleness marker for histogram and summary

* modify test case implementation

* feat: add test cases for histogram and summary

* fix: lint error

* fix: impi error

* fix: remove else statement for the readability

Co-authored-by: James Park <[email protected]>
  • Loading branch information
Hyunuk Lim and JamesJHPark authored Dec 14, 2021
1 parent 60440ca commit 21a6d99
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 💡 Enhancements 💡

- `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679)

## 🛑 Breaking changes 🛑

- `memcachedreceiver`: Update metric names (#6594)
Expand Down
91 changes: 88 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -380,7 +381,20 @@ func Test_PushMetrics(t *testing.T) {

emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary])

checkFunc := func(t *testing.T, r *http.Request, expected int) {
// staleNaN cases
staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])

staleNaNSummaryBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSummary])

staleNaNIntGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntGauge])

staleNaNDoubleGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNDoubleGauge])

staleNaNIntSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntSum])

staleNaNSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSum])

checkFunc := func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
Expand All @@ -397,15 +411,19 @@ func Test_PushMetrics(t *testing.T) {
ok := proto.Unmarshal(dest, wr)
require.Nil(t, ok)
assert.EqualValues(t, expected, len(wr.Timeseries))
if isStaleMarker {
assert.True(t, value.IsStaleNaN(wr.Timeseries[0].Samples[0].Value))
}
}

tests := []struct {
name string
md *pdata.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int)
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
}{
{
"invalid_type_case",
Expand All @@ -414,6 +432,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"intSum_case",
Expand All @@ -422,6 +441,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"doubleSum_case",
Expand All @@ -430,6 +450,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"doubleGauge_case",
Expand All @@ -438,6 +459,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"intGauge_case",
Expand All @@ -446,6 +468,7 @@ func Test_PushMetrics(t *testing.T) {
2,
http.StatusAccepted,
false,
false,
},
{
"histogram_case",
Expand All @@ -454,6 +477,7 @@ func Test_PushMetrics(t *testing.T) {
12,
http.StatusAccepted,
false,
false,
},
{
"summary_case",
Expand All @@ -462,6 +486,7 @@ func Test_PushMetrics(t *testing.T) {
10,
http.StatusAccepted,
false,
false,
},
{
"unmatchedBoundBucketHist_case",
Expand All @@ -470,6 +495,7 @@ func Test_PushMetrics(t *testing.T) {
5,
http.StatusAccepted,
false,
false,
},
{
"5xx_case",
Expand All @@ -478,6 +504,7 @@ func Test_PushMetrics(t *testing.T) {
5,
http.StatusServiceUnavailable,
true,
false,
},
{
"emptyGauge_case",
Expand All @@ -486,6 +513,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"emptyCumulativeSum_case",
Expand All @@ -494,6 +522,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"emptyCumulativeHistogram_case",
Expand All @@ -502,6 +531,7 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"emptySummary_case",
Expand All @@ -510,14 +540,69 @@ func Test_PushMetrics(t *testing.T) {
0,
http.StatusAccepted,
true,
false,
},
{
"staleNaNIntGauge_case",
&staleNaNIntGaugeBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNDoubleGauge_case",
&staleNaNDoubleGaugeBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNIntSum_case",
&staleNaNIntSumBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNSum_case",
&staleNaNSumBatch,
checkFunc,
1,
http.StatusAccepted,
false,
true,
},
{
"staleNaNHistogram_case",
&staleNaNHistogramBatch,
checkFunc,
6,
http.StatusAccepted,
false,
true,
},
{
"staleNaNSummary_case",
&staleNaNSummaryBatch,
checkFunc,
5,
http.StatusAccepted,
false,
true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tt.reqTestFunc != nil {
tt.reqTestFunc(t, r, tt.expectedTimeSeries)
tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker)
}
w.WriteHeader(tt.httpResponseCode)
}))
Expand Down
27 changes: 26 additions & 1 deletion exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/model/pdata"
)
Expand Down Expand Up @@ -329,6 +330,9 @@ func addSingleNumberDataPoint(pt pdata.NumberDataPoint, resource pdata.Resource,
case pdata.MetricValueTypeDouble:
sample.Value = pt.DoubleVal()
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
sample.Value = math.Float64frombits(value.StaleNaN)
}
addSample(tsMap, sample, labels, metric)
}

Expand All @@ -344,6 +348,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: pt.Sum(),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
sum.Value = math.Float64frombits(value.StaleNaN)
}

sumlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, metric)
Expand All @@ -353,6 +360,10 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: float64(pt.Count()),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
count.Value = math.Float64frombits(value.StaleNaN)
}

countlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, metric)

Expand All @@ -373,6 +384,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: float64(cumulativeCount),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
bucket.Value = math.Float64frombits(value.StaleNaN)
}
boundStr := strconv.FormatFloat(bound, 'f', -1, 64)
labels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, boundStr)
sig := addSample(tsMap, bucket, labels, metric)
Expand All @@ -385,6 +399,9 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
Value: float64(cumulativeCount),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
infBucket.Value = math.Float64frombits(value.StaleNaN)
}
infLabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, pInfStr)
sig := addSample(tsMap, infBucket, infLabels, metric)

Expand Down Expand Up @@ -431,7 +448,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc
Value: pt.Sum(),
Timestamp: time,
}

if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
sum.Value = math.Float64frombits(value.StaleNaN)
}
sumlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, metric)

Expand All @@ -440,6 +459,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc
Value: float64(pt.Count()),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
count.Value = math.Float64frombits(value.StaleNaN)
}
countlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, metric)

Expand All @@ -450,6 +472,9 @@ func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resourc
Value: qt.Value(),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
quantile.Value = math.Float64frombits(value.StaleNaN)
}
percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64)
qtlabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName, quantileStr, percentileStr)
addSample(tsMap, quantile, qtlabels, metric)
Expand Down
Loading

0 comments on commit 21a6d99

Please sign in to comment.