Skip to content

Commit

Permalink
modifies otlp_metric_adjuster to support datapoint flags for stalenes…
Browse files Browse the repository at this point in the history
…s markers (#6696)
  • Loading branch information
PaurushGarg authored Dec 13, 2021
1 parent 4d6d046 commit 10c911a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 6 deletions.
27 changes: 21 additions & 6 deletions receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pdata.Metric) (re
currentDist := currentPoints.At(i)
tsi := ma.tsm.get(current, currentDist.Attributes())
previous := tsi.previous
tsi.previous = current
if !currentDist.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
tsi.previous = current
}
if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
Expand All @@ -349,7 +351,10 @@ func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pdata.Metric) (re
resets++
continue
}

if currentDist.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
currentDist.SetStartTimestamp(initialPoints.At(i).StartTimestamp())
continue
}
previousDist := previousPoints.At(i)
if currentDist.Count() < previousDist.Count() || currentDist.Sum() < previousDist.Sum() {
// reset detected
Expand All @@ -370,7 +375,9 @@ func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pdata.Metric) (resets i
currentSum := currentPoints.At(i)
tsi := ma.tsm.get(current, currentSum.Attributes())
previous := tsi.previous
tsi.previous = current
if !currentSum.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
tsi.previous = current
}
if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
Expand All @@ -388,7 +395,10 @@ func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pdata.Metric) (resets i
resets++
continue
}

if currentSum.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
currentSum.SetStartTimestamp(initialPoints.At(i).StartTimestamp())
continue
}
previousSum := previousPoints.At(i)
if currentSum.DoubleVal() < previousSum.DoubleVal() {
// reset detected
Expand All @@ -410,7 +420,9 @@ func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pdata.Metric) (rese
currentSummary := currentPoints.At(i)
tsi := ma.tsm.get(current, currentSummary.Attributes())
previous := tsi.previous
tsi.previous = current
if !currentSummary.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
tsi.previous = current
}
if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
Expand All @@ -428,7 +440,10 @@ func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pdata.Metric) (rese
resets++
continue
}

if currentSummary.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
currentSummary.SetStartTimestamp(initialPoints.At(i).StartTimestamp())
continue
}
previousSummary := previousPoints.At(i)
if (currentSummary.Count() != 0 &&
previousSummary.Count() != 0 &&
Expand Down
142 changes: 142 additions & 0 deletions receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,40 @@ func Test_cumulative_pdata(t *testing.T) {
}(),
0,
},
{
"Cumulative: round 5 - instance adjusted based on round 4",
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(pdt5Ms)
pt0.Attributes().InsertString("k1", "v1")
pt0.Attributes().InsertString("k2", "v2")
pt0.SetTimestamp(pdt5Ms)
pt0.SetFlags(1)

return &mL
}(),
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(pdt3Ms)
pt0.Attributes().InsertString("k1", "v1")
pt0.Attributes().InsertString("k2", "v2")
pt0.SetTimestamp(pdt5Ms)
pt0.SetFlags(1)

return &mL
}(),
0,
},
}
runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}
Expand Down Expand Up @@ -416,6 +450,73 @@ func Test_summary_no_count_pdata(t *testing.T) {
runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_summary_flag_norecordedvalue(t *testing.T) {
script := []*metricsAdjusterTestPdata{
{
"Summary No Count: round 1 - initial instance, start time is established",
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSummary)
m0.SetName("summary1")
s0 := m0.Summary()
pt0 := s0.DataPoints().AppendEmpty()
pt0.Attributes().InsertString("v1", "v2")
pt0.SetStartTimestamp(pdt1Ms)
pt0.SetTimestamp(pdt1Ms)
populateSummary(&pt0, pdt1Ms, 10, 40, percent0, []float64{1, 5, 8})
return &mL
}(),
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSummary)
m0.SetName("summary1")
s0 := m0.Summary()
pt0 := s0.DataPoints().AppendEmpty()
pt0.Attributes().InsertString("v1", "v2")
pt0.SetStartTimestamp(pdt1Ms)
pt0.SetTimestamp(pdt1Ms)
populateSummary(&pt0, pdt1Ms, 10, 40, percent0, []float64{1, 5, 8})
return &mL
}(),
1,
},
{
"Summary Flag NoRecordedValue: round 2 - instance adjusted based on round 1",
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSummary)
m0.SetName("summary1")
s0 := m0.Summary()
pt0 := s0.DataPoints().AppendEmpty()
pt0.Attributes().InsertString("v1", "v2")
pt0.SetStartTimestamp(pdt2Ms)
pt0.SetTimestamp(pdt2Ms)
pt0.SetFlags(1)
return &mL
}(),
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSummary)
m0.SetName("summary1")
s0 := m0.Summary()
pt0 := s0.DataPoints().AppendEmpty()
pt0.Attributes().InsertString("v1", "v2")
pt0.SetStartTimestamp(pdt1Ms)
pt0.SetTimestamp(pdt2Ms)
pt0.SetFlags(1)
return &mL
}(),
0,
},
}

runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_summary_pdata(t *testing.T) {
script := []*metricsAdjusterTestPdata{
{
Expand Down Expand Up @@ -526,6 +627,47 @@ func Test_cumulativeDistribution_pdata(t *testing.T) {
runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_histogram_flag_norecordedvalue(t *testing.T) {
script := []*metricsAdjusterTestPdata{
{
"Histogram: round 1 - initial instance, start time is established",
metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{7, 4, 2, 12}))),
metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{7, 4, 2, 12}))),
1,
},
{
"Histogram: round 2 - instance adjusted based on round 1",
func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeHistogram)
histogram := metric.Histogram()
histogram.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
destPointL := histogram.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt2Ms)
dp.SetFlags(1)
return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt2Ms, &dp))
}(),
func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeHistogram)
histogram := metric.Histogram()
histogram.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
destPointL := histogram.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt2Ms)
dp.SetFlags(1)
return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, &dp))
}(),
0,
},
}

runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_multiMetrics_pdata(t *testing.T) {
g1 := "gauge1"
script := []*metricsAdjusterTestPdata{
Expand Down

0 comments on commit 10c911a

Please sign in to comment.