Skip to content

Commit

Permalink
Delete old metrics_adjuster and translate cumulative tests
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Aug 5, 2021
1 parent 4451b14 commit b7566bb
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 41 deletions.
34 changes: 12 additions & 22 deletions receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,23 +246,6 @@ func (ma *MetricsAdjusterPdata) adjustMetricPoints(metric *pdata.Metric) int {
ma.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
return 0
}

/*
resets := 0
filtered := make([]*metricspb.TimeSeries, 0, len(metric.GetTimeseries()))
for _, current := range metric.GetTimeseries() {
tsi := ma.tsm.get(metric, current.GetLabelValues())
if tsi.initial == nil || !ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial, tsi.previous) {
// initial || reset timeseries
tsi.initial = current
resets++
}
tsi.previous = current
filtered = append(filtered, current)
}
metric.Timeseries = filtered
return resets
*/
}

// Returns true if 'current' was adjusted and false if 'current' is an the initial occurrence or a
Expand All @@ -273,13 +256,15 @@ func (ma *MetricsAdjusterPdata) adjustMetricGauge(current *pdata.Metric) (resets
for i := 0; i < currentPoints.Len(); i++ {
currentGauge := currentPoints.At(i)
tsi := ma.tsm.get(current, currentGauge.LabelsMap())
previous := tsi.previous
tsi.previous = current
if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
}
initialPoints := tsi.initial.Gauge().DataPoints()
previousPoints := tsi.previous.Gauge().DataPoints()
previousPoints := previous.Gauge().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
ma.logger.Info("Adjusting Points, all lengths should be equal",
zap.Int("len(current)", currentPoints.Len()),
Expand Down Expand Up @@ -311,14 +296,16 @@ func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pdata.Metric) (re
for i := 0; i < currentPoints.Len(); i++ {
currentDist := currentPoints.At(i)
tsi := ma.tsm.get(current, currentDist.LabelsMap())
previous := tsi.previous
tsi.previous = current
if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
continue
}
initialPoints := tsi.initial.Histogram().DataPoints()
previousPoints := tsi.previous.Histogram().DataPoints()
previousPoints := previous.Histogram().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
ma.logger.Info("Adjusting Points, all lengths should be equal",
zap.Int("len(current)", currentPoints.Len()),
Expand Down Expand Up @@ -349,14 +336,16 @@ func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pdata.Metric) (resets i
for i := 0; i < currentPoints.Len(); i++ {
currentSum := currentPoints.At(i)
tsi := ma.tsm.get(current, currentSum.LabelsMap())
previous := tsi.previous
tsi.previous = current
if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
continue
}
initialPoints := tsi.initial.Sum().DataPoints()
previousPoints := tsi.previous.Sum().DataPoints()
previousPoints := previous.Sum().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
ma.logger.Info("Adjusting Points, all lengths should be equal",
zap.Int("len(current)", currentPoints.Len()),
Expand All @@ -374,7 +363,6 @@ func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pdata.Metric) (resets i
resets++
continue
}

initialSum := initialPoints.At(i)
currentSum.SetStartTimestamp(initialSum.StartTimestamp())
}
Expand All @@ -388,14 +376,16 @@ func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pdata.Metric) (rese
for i := 0; i < currentPoints.Len(); i++ {
currentSummary := currentPoints.At(i)
tsi := ma.tsm.get(current, currentSummary.LabelsMap())
previous := tsi.previous
tsi.previous = current
if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
continue
}
initialPoints := tsi.initial.Summary().DataPoints()
previousPoints := tsi.previous.Summary().DataPoints()
previousPoints := previous.Summary().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
ma.logger.Info("Adjusting Points, all lengths should be equal",
zap.Int("len(current)", currentPoints.Len()),
Expand Down
122 changes: 103 additions & 19 deletions receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
t1Ms = pdata.Timestamp(time.Unix(0, 1000000).UnixNano())
t2Ms = pdata.Timestamp(time.Unix(0, 2000000).UnixNano())
t3Ms = pdata.Timestamp(time.Unix(0, 3000000).UnixNano())
t4Ms = pdata.Timestamp(time.Unix(0, 5000000).UnixNano())
)

func Test_gauge(t *testing.T) {
Expand Down Expand Up @@ -170,24 +171,108 @@ func Test_cumulative(t *testing.T) {
}(),
1,
},
/*
{
"Cumulative: round 2 - instance adjusted based on round 1",
[]*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.Double(t2Ms, 66)))},
[]*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 66)))},
0,
}, {
"Cumulative: round 3 - instance reset (value less than previous value), start time is reset",
[]*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55)))},
[]*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55)))},
1,
}, {
"Cumulative: round 4 - instance adjusted based on round 3",
[]*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t4Ms, 72)))},
[]*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t4Ms, 72)))},
0,
},
*/
{
"Cumulative: round 2 - instance adjusted based on round 1",
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(t2Ms)
pt0.LabelsMap().Insert("k1", "v1")
pt0.LabelsMap().Insert("k2", "v2")
pt0.SetTimestamp(t2Ms)
pt0.SetDoubleVal(66.0)

return &mL
}(),
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(t1Ms)
pt0.LabelsMap().Insert("k1", "v1")
pt0.LabelsMap().Insert("k2", "v2")
pt0.SetTimestamp(t2Ms)
pt0.SetDoubleVal(66.0)

return &mL
}(),
0,
},
{
"Cumulative: round 3 - instance reset (value less than previous value), start time is reset",
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(t3Ms)
pt0.LabelsMap().Insert("k1", "v1")
pt0.LabelsMap().Insert("k2", "v2")
pt0.SetTimestamp(t3Ms)
pt0.SetDoubleVal(55.0)

return &mL
}(),
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(t3Ms)
pt0.LabelsMap().Insert("k1", "v1")
pt0.LabelsMap().Insert("k2", "v2")
pt0.SetTimestamp(t3Ms)
pt0.SetDoubleVal(55.0)

return &mL
}(),
1,
},
{
"Cumulative: round 4 - instance adjusted based on round 3",
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(t4Ms)
pt0.LabelsMap().Insert("k1", "v1")
pt0.LabelsMap().Insert("k2", "v2")
pt0.SetTimestamp(t4Ms)
pt0.SetDoubleVal(72.0)

return &mL
}(),
func() *pdata.MetricSlice {
mL := pdata.NewMetricSlice()
m0 := mL.AppendEmpty()
m0.SetDataType(pdata.MetricDataTypeSum)
m0.SetName("cumulative1")
g0 := m0.Sum()
pt0 := g0.DataPoints().AppendEmpty()
pt0.SetStartTimestamp(t3Ms)
pt0.LabelsMap().Insert("k1", "v1")
pt0.LabelsMap().Insert("k2", "v2")
pt0.SetTimestamp(t4Ms)
pt0.SetDoubleVal(72.0)

return &mL
}(),
0,
},
}
runScript(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}
Expand All @@ -209,7 +294,6 @@ var (
t1Ms = pdata.Timestamp(time.Unix(0, 1000000).UnixNano())
t2Ms = pdata.Timestamp(time.Unix(0, 2000000).UnixNano())
t3Ms = pdata.Timestamp(time.Unix(0, 3000000).UnixNano())
t4Ms = pdata.Timestamp(time.Unix(0, 5000000).UnixNano())
t5Ms = pdata.Timestamp(time.Unix(0, 5000000).UnixNano())
)
Expand Down

0 comments on commit b7566bb

Please sign in to comment.