From 526e1fbe53f42cbea72617a2c38a3e61fc998514 Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Wed, 28 Apr 2021 15:27:59 -0400 Subject: [PATCH 1/5] receiver/prometheus: Do not drop metrics used to establish start time Signed-off-by: Anthony J Mirabella --- .../internal/metrics_adjuster.go | 112 ++--- .../internal/metrics_adjuster_test.go | 156 +++--- .../metrics_receiver_test.go | 456 +++++++++++++++--- 3 files changed, 502 insertions(+), 222 deletions(-) diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index 4e734faff3f..90e3b7c6d83 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -22,7 +22,6 @@ import ( metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "go.uber.org/zap" - "google.golang.org/protobuf/types/known/wrapperspb" ) // Notes on garbage collection (gc): @@ -202,27 +201,22 @@ func NewMetricsAdjuster(tsm *timeseriesMap, logger *zap.Logger) *MetricsAdjuster } // AdjustMetrics takes a sequence of metrics and adjust their values based on the initial and -// previous points in the timeseriesMap. If the metric is the first point in the timeseries, or the -// timeseries has been reset, it is removed from the sequence and added to the timeseriesMap. -// Additionally returns the total number of timeseries dropped from the metrics. +// previous points in the timeseriesMap. +// Returns the total number of timeseries that had reset start times. func (ma *MetricsAdjuster) AdjustMetrics(metrics []*metricspb.Metric) ([]*metricspb.Metric, int) { var adjusted = make([]*metricspb.Metric, 0, len(metrics)) - dropped := 0 + reset := 0 ma.tsm.Lock() defer ma.tsm.Unlock() for _, metric := range metrics { - adj, d := ma.adjustMetric(metric) - dropped += d - if adj { - adjusted = append(adjusted, metric) - } + d := ma.adjustMetric(metric) + reset += d + adjusted = append(adjusted, metric) } - return adjusted, dropped + return adjusted, reset } -// Returns true if at least one of the metric's timeseries was adjusted and false if all of the -// timeseries are an initial occurrence or a reset. Additionally returns the number of timeseries -// dropped from the metric. +// Returns the number of timeseries with reset start times. // // Types of metrics returned supported by prometheus: // - MetricDescriptor_GAUGE_DOUBLE @@ -230,44 +224,32 @@ func (ma *MetricsAdjuster) AdjustMetrics(metrics []*metricspb.Metric) ([]*metric // - MetricDescriptor_CUMULATIVE_DOUBLE // - MetricDescriptor_CUMULATIVE_DISTRIBUTION // - MetricDescriptor_SUMMARY -func (ma *MetricsAdjuster) adjustMetric(metric *metricspb.Metric) (bool, int) { +func (ma *MetricsAdjuster) adjustMetric(metric *metricspb.Metric) int { switch metric.MetricDescriptor.Type { case metricspb.MetricDescriptor_GAUGE_DOUBLE, metricspb.MetricDescriptor_GAUGE_DISTRIBUTION: // gauges don't need to be adjusted so no additional processing is necessary - return true, 0 + return 0 default: return ma.adjustMetricTimeseries(metric) } } -// Returns true if at least one of the metric's timeseries was adjusted and false if all of the -// timeseries are an initial occurrence or a reset. Additionally returns the number of timeseries -// dropped. -func (ma *MetricsAdjuster) adjustMetricTimeseries(metric *metricspb.Metric) (bool, int) { - dropped := 0 +// Returns the number of timeseries that had reset start times. +func (ma *MetricsAdjuster) adjustMetricTimeseries(metric *metricspb.Metric) int { + reset := 0 filtered := make([]*metricspb.TimeSeries, 0, len(metric.GetTimeseries())) for _, current := range metric.GetTimeseries() { tsi := ma.tsm.get(metric, current.GetLabelValues()) - if tsi.initial == nil { - // initial timeseries + if tsi.initial == nil || !ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial, tsi.previous) { + // initial || reset timeseries tsi.initial = current - tsi.previous = current - dropped++ - } else { - if ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial, - tsi.previous) { - tsi.previous = current - filtered = append(filtered, current) - } else { - // reset timeseries - tsi.initial = current - tsi.previous = current - dropped++ - } + reset++ } + tsi.previous = current + filtered = append(filtered, current) } metric.Timeseries = filtered - return len(filtered) > 0, dropped + return reset } // Returns true if 'current' was adjusted and false if 'current' is an the initial occurrence or a @@ -289,7 +271,7 @@ func (ma *MetricsAdjuster) adjustPoints(metricType metricspb.MetricDescriptor_Ty zap.Int("len(current)", len(current)), zap.Int("len(initial)", len(initial)), zap.Int("len(previous)", len(previous))) return true } - return ma.adjustPoint(metricType, current[0], initial[0], previous[0]) + return ma.adjustPoint(metricType, current[0], previous[0]) } // Note: There is an important, subtle point here. When a new timeseries or a reset is detected, @@ -300,72 +282,32 @@ func (ma *MetricsAdjuster) adjustPoints(metricType metricspb.MetricDescriptor_Ty // timeseries were created instead, previous could be used directly but this would mean reallocating // all of the metrics. func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Type, - current, initial, previous *metricspb.Point) bool { + current, previous *metricspb.Point) bool { switch metricType { case metricspb.MetricDescriptor_CUMULATIVE_DOUBLE: - currentValue := current.GetDoubleValue() - initialValue := initial.GetDoubleValue() - previousValue := initialValue - if initial != previous { - previousValue += previous.GetDoubleValue() - } - if currentValue < previousValue { + if current.GetDoubleValue() < previous.GetDoubleValue() { // reset detected return false } - current.Value = - &metricspb.Point_DoubleValue{DoubleValue: currentValue - initialValue} case metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION: // note: sum of squared deviation not currently supported currentDist := current.GetDistributionValue() - initialDist := initial.GetDistributionValue() - previousCount := initialDist.Count - previousSum := initialDist.Sum - if initial != previous { - previousCount += previous.GetDistributionValue().Count - previousSum += previous.GetDistributionValue().Sum - } - if currentDist.Count < previousCount || currentDist.Sum < previousSum { + previousDist := previous.GetDistributionValue() + if currentDist.Count < previousDist.Count || currentDist.Sum < previousDist.Sum { // reset detected return false } - currentDist.Count -= initialDist.Count - currentDist.Sum -= initialDist.Sum - ma.adjustBuckets(currentDist.Buckets, initialDist.Buckets) case metricspb.MetricDescriptor_SUMMARY: // note: for summary, we don't adjust the snapshot - currentCount := current.GetSummaryValue().Count.GetValue() - currentSum := current.GetSummaryValue().Sum.GetValue() - initialCount := initial.GetSummaryValue().Count.GetValue() - initialSum := initial.GetSummaryValue().Sum.GetValue() - previousCount := initialCount - previousSum := initialSum - if initial != previous { - previousCount += previous.GetSummaryValue().Count.GetValue() - previousSum += previous.GetSummaryValue().Sum.GetValue() - } - if currentCount < previousCount || currentSum < previousSum { + currentSummary := current.GetSummaryValue() + previousSummary := previous.GetSummaryValue() + if currentSummary.Count.GetValue() < previousSummary.Count.GetValue() || currentSummary.Sum.GetValue() < previousSummary.Sum.GetValue() { // reset detected return false } - current.GetSummaryValue().Count = - &wrapperspb.Int64Value{Value: currentCount - initialCount} - current.GetSummaryValue().Sum = - &wrapperspb.DoubleValue{Value: currentSum - initialSum} default: // this shouldn't happen ma.logger.Info("Adjust - skipping unexpected point", zap.String("type", metricType.String())) } return true } - -func (ma *MetricsAdjuster) adjustBuckets(current, initial []*metricspb.DistributionValue_Bucket) { - if len(current) != len(initial) { - // this shouldn't happen - ma.logger.Info("Bucket sizes not equal", zap.Int("len(current)", len(current)), zap.Int("len(initial)", len(initial))) - return - } - for i := 0; i < len(current); i++ { - current[i].Count -= initial[i].Count - } -} diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go index c34011af678..277f61be251 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go @@ -31,14 +31,17 @@ func Test_gauge(t *testing.T) { "Gauge: round 1 - gauge not adjusted", []*metricspb.Metric{mtu.Gauge(g1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)))}, []*metricspb.Metric{mtu.Gauge(g1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)))}, + 0, }, { "Gauge: round 2 - gauge not adjusted", []*metricspb.Metric{mtu.Gauge(g1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.Double(t2Ms, 66)))}, []*metricspb.Metric{mtu.Gauge(g1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.Double(t2Ms, 66)))}, + 0, }, { "Gauge: round 3 - value less than previous value - gauge is not adjusted", []*metricspb.Metric{mtu.Gauge(g1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55)))}, []*metricspb.Metric{mtu.Gauge(g1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55)))}, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } @@ -48,77 +51,92 @@ func Test_gaugeDistribution(t *testing.T) { "GaugeDist: round 1 - gauge distribution not adjusted", []*metricspb.Metric{mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})))}, []*metricspb.Metric{mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})))}, + 0, }, { "GaugeDist: round 2 - gauge distribution not adjusted", []*metricspb.Metric{mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{6, 5, 8, 11})))}, []*metricspb.Metric{mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{6, 5, 8, 11})))}, + 0, }, { "GaugeDist: round 3 - count/sum less than previous - gauge distribution not adjusted", []*metricspb.Metric{mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{2, 0, 1, 5})))}, []*metricspb.Metric{mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{2, 0, 1, 5})))}, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_cumulative(t *testing.T) { script := []*metricsAdjusterTest{{ - "Cumulative: round 1 - initial instance, adjusted should be empty", + "Cumulative: round 1 - initial instance, start time is established", []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)))}, + 1, }, { "Cumulative: round 2 - instance adjusted based on round 1", []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.Double(t2Ms, 66)))}, - []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 22)))}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 66)))}, + 0, }, { - "Cumulative: round 3 - instance reset (value less than previous value), adjusted should be empty", + "Cumulative: round 3 - instance reset (value less than previous value), start time is reset", []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55)))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55)))}, + 1, }, { "Cumulative: round 4 - instance adjusted based on round 3", []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t4Ms, 72)))}, - []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t4Ms, 17)))}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t4Ms, 72)))}, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_cumulativeDistribution(t *testing.T) { script := []*metricsAdjusterTest{{ - "CumulativeDist: round 1 - initial instance, adjusted should be empty", + "CumulativeDist: round 1 - initial instance, start time is established", []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})))}, + 1, }, { "CumulativeDist: round 2 - instance adjusted based on round 1", []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{6, 3, 4, 8})))}, - []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{2, 1, 1, 1})))}, + []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{6, 3, 4, 8})))}, + 0, }, { - "CumulativeDist: round 3 - instance reset (value less than previous value), adjusted should be empty", + "CumulativeDist: round 3 - instance reset (value less than previous value), start time is reset", []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{5, 3, 2, 7})))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{5, 3, 2, 7})))}, + 1, }, { "CumulativeDist: round 4 - instance adjusted based on round 3", []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.DistPt(t4Ms, bounds0, []int64{7, 4, 2, 12})))}, - []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t4Ms, bounds0, []int64{2, 1, 0, 5})))}, + []*metricspb.Metric{mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t4Ms, bounds0, []int64{7, 4, 2, 12})))}, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_summary(t *testing.T) { script := []*metricsAdjusterTest{{ - "Summary: round 1 - initial instance, adjusted should be empty", + "Summary: round 1 - initial instance, start time is established", []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.SummPt(t1Ms, 10, 40, percent0, []float64{1, 5, 8})))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.SummPt(t1Ms, 10, 40, percent0, []float64{1, 5, 8})))}, + 1, }, { "Summary: round 2 - instance adjusted based on round 1", []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.SummPt(t2Ms, 15, 70, percent0, []float64{7, 44, 9})))}, - []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.SummPt(t2Ms, 5, 30, percent0, []float64{7, 44, 9})))}, + []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.SummPt(t2Ms, 15, 70, percent0, []float64{7, 44, 9})))}, + 0, }, { - "Summary: round 3 - instance reset (count less than previous), adjusted should be empty", + "Summary: round 3 - instance reset (count less than previous), start time is reset", []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.SummPt(t3Ms, 12, 66, percent0, []float64{3, 22, 5})))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.SummPt(t3Ms, 12, 66, percent0, []float64{3, 22, 5})))}, + 1, }, { "Summary: round 4 - instance adjusted based on round 3", []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.SummPt(t4Ms, 14, 96, percent0, []float64{9, 47, 8})))}, - []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.SummPt(t4Ms, 2, 30, percent0, []float64{9, 47, 8})))}, + []*metricspb.Metric{mtu.Summary(s1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.SummPt(t4Ms, 14, 96, percent0, []float64{9, 47, 8})))}, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } @@ -136,7 +154,11 @@ func Test_multiMetrics(t *testing.T) { []*metricspb.Metric{ mtu.Gauge(g1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44))), mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7}))), + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7}))), + mtu.Summary(s1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.SummPt(t1Ms, 10, 40, percent0, []float64{1, 5, 8}))), }, + 3, }, { "MultiMetrics: round 2 - combined round 2 of individual metrics", []*metricspb.Metric{ @@ -149,10 +171,11 @@ func Test_multiMetrics(t *testing.T) { []*metricspb.Metric{ mtu.Gauge(g1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.Double(t2Ms, 66))), mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{6, 5, 8, 11}))), - mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 22))), - mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{2, 1, 1, 1}))), - mtu.Summary(s1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.SummPt(t2Ms, 5, 30, percent0, []float64{7, 44, 9}))), + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 66))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{6, 3, 4, 8}))), + mtu.Summary(s1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.SummPt(t2Ms, 15, 70, percent0, []float64{7, 44, 9}))), }, + 0, }, { "MultiMetrics: round 3 - combined round 3 of individual metrics", []*metricspb.Metric{ @@ -165,7 +188,11 @@ func Test_multiMetrics(t *testing.T) { []*metricspb.Metric{ mtu.Gauge(g1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55))), mtu.GaugeDist(gd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{2, 0, 1, 5}))), + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 55))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{5, 3, 2, 7}))), + mtu.Summary(s1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.SummPt(t3Ms, 12, 66, percent0, []float64{3, 22, 5}))), }, + 3, }, { "MultiMetrics: round 4 - combined round 4 of individual metrics", []*metricspb.Metric{ @@ -174,72 +201,86 @@ func Test_multiMetrics(t *testing.T) { mtu.Summary(s1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.SummPt(t4Ms, 14, 96, percent0, []float64{9, 47, 8}))), }, []*metricspb.Metric{ - mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t4Ms, 17))), - mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t4Ms, bounds0, []int64{2, 1, 0, 5}))), - mtu.Summary(s1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.SummPt(t4Ms, 2, 30, percent0, []float64{9, 47, 8}))), + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t4Ms, 72))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t4Ms, bounds0, []int64{7, 4, 2, 12}))), + mtu.Summary(s1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.SummPt(t4Ms, 14, 96, percent0, []float64{9, 47, 8}))), }, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_multiTimeseries(t *testing.T) { script := []*metricsAdjusterTest{{ - "MultiTimeseries: round 1 - initial first instance, adjusted should be empty", + "MultiTimeseries: round 1 - initial first instance, start time is established", []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)))}, + 1, }, { "MultiTimeseries: round 2 - first instance adjusted based on round 1, initial second instance", []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.Double(t2Ms, 66)), mtu.Timeseries(t2Ms, v10v20, mtu.Double(t2Ms, 20)))}, - []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 22)))}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 66)), mtu.Timeseries(t2Ms, v10v20, mtu.Double(t2Ms, 20)))}, + 1, }, { "MultiTimeseries: round 3 - first instance adjusted based on round 1, second based on round 2", []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.Double(t3Ms, 88)), mtu.Timeseries(t3Ms, v10v20, mtu.Double(t3Ms, 49)))}, - []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t3Ms, 44)), mtu.Timeseries(t2Ms, v10v20, mtu.Double(t3Ms, 29)))}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t3Ms, 88)), mtu.Timeseries(t2Ms, v10v20, mtu.Double(t3Ms, 49)))}, + 0, }, { "MultiTimeseries: round 4 - first instance reset, second instance adjusted based on round 2, initial third instance", []*metricspb.Metric{ mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t4Ms, 87)), mtu.Timeseries(t4Ms, v10v20, mtu.Double(t4Ms, 57)), mtu.Timeseries(t4Ms, v100v200, mtu.Double(t4Ms, 10)))}, []*metricspb.Metric{ - mtu.Cumulative(c1, k1k2, mtu.Timeseries(t2Ms, v10v20, mtu.Double(t4Ms, 37)))}, + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t4Ms, 87)), mtu.Timeseries(t2Ms, v10v20, mtu.Double(t4Ms, 57)), mtu.Timeseries(t4Ms, v100v200, mtu.Double(t4Ms, 10)))}, + 2, }, { - "MultiTimeseries: round 5 - first instance adusted based on round 4, second on round 2, third on round 4", + "MultiTimeseries: round 5 - first instance adjusted based on round 4, second on round 2, third on round 4", []*metricspb.Metric{ mtu.Cumulative(c1, k1k2, mtu.Timeseries(t5Ms, v1v2, mtu.Double(t5Ms, 90)), mtu.Timeseries(t5Ms, v10v20, mtu.Double(t5Ms, 65)), mtu.Timeseries(t5Ms, v100v200, mtu.Double(t5Ms, 22)))}, []*metricspb.Metric{ - mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t5Ms, 3)), mtu.Timeseries(t2Ms, v10v20, mtu.Double(t5Ms, 45)), mtu.Timeseries(t4Ms, v100v200, mtu.Double(t5Ms, 12)))}, + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t5Ms, 90)), mtu.Timeseries(t2Ms, v10v20, mtu.Double(t5Ms, 65)), mtu.Timeseries(t4Ms, v100v200, mtu.Double(t5Ms, 22)))}, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_emptyLabels(t *testing.T) { script := []*metricsAdjusterTest{{ - "EmptyLabels: round 1 - initial instance, implicitly empty labels, adjusted should be empty", + "EmptyLabels: round 1 - initial instance, implicitly empty labels, start time is established", []*metricspb.Metric{mtu.Cumulative(c1, []string{}, mtu.Timeseries(t1Ms, []string{}, mtu.Double(t1Ms, 44)))}, - []*metricspb.Metric{}, + []*metricspb.Metric{mtu.Cumulative(c1, []string{}, mtu.Timeseries(t1Ms, []string{}, mtu.Double(t1Ms, 44)))}, + 1, }, { "EmptyLabels: round 2 - instance adjusted based on round 1", []*metricspb.Metric{mtu.Cumulative(c1, []string{}, mtu.Timeseries(t2Ms, []string{}, mtu.Double(t2Ms, 66)))}, - []*metricspb.Metric{mtu.Cumulative(c1, []string{}, mtu.Timeseries(t1Ms, []string{}, mtu.Double(t2Ms, 22)))}, + []*metricspb.Metric{mtu.Cumulative(c1, []string{}, mtu.Timeseries(t1Ms, []string{}, mtu.Double(t2Ms, 66)))}, + 0, }, { "EmptyLabels: round 3 - one explicitly empty label, instance adjusted based on round 1", []*metricspb.Metric{mtu.Cumulative(c1, k1, mtu.Timeseries(t3Ms, []string{""}, mtu.Double(t3Ms, 77)))}, - []*metricspb.Metric{mtu.Cumulative(c1, k1, mtu.Timeseries(t1Ms, []string{""}, mtu.Double(t3Ms, 33)))}, + []*metricspb.Metric{mtu.Cumulative(c1, k1, mtu.Timeseries(t1Ms, []string{""}, mtu.Double(t3Ms, 77)))}, + 0, }, { "EmptyLabels: round 4 - three explicitly empty labels, instance adjusted based on round 1", []*metricspb.Metric{mtu.Cumulative(c1, k1k2k3, mtu.Timeseries(t3Ms, []string{"", "", ""}, mtu.Double(t3Ms, 88)))}, - []*metricspb.Metric{mtu.Cumulative(c1, k1k2k3, mtu.Timeseries(t1Ms, []string{"", "", ""}, mtu.Double(t3Ms, 44)))}, + []*metricspb.Metric{mtu.Cumulative(c1, k1k2k3, mtu.Timeseries(t1Ms, []string{"", "", ""}, mtu.Double(t3Ms, 88)))}, + 0, }} runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_tsGC(t *testing.T) { script1 := []*metricsAdjusterTest{{ - "TsGC: round 1 - initial instances, adjusted should be empty", + "TsGC: round 1 - initial instances, start time is established", []*metricspb.Metric{ mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)), mtu.Timeseries(t1Ms, v10v20, mtu.Double(t1Ms, 20))), mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})), mtu.Timeseries(t1Ms, v10v20, mtu.DistPt(t1Ms, bounds0, []int64{40, 20, 30, 70}))), }, - []*metricspb.Metric{}, + []*metricspb.Metric{ + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)), mtu.Timeseries(t1Ms, v10v20, mtu.Double(t1Ms, 20))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})), mtu.Timeseries(t1Ms, v10v20, mtu.DistPt(t1Ms, bounds0, []int64{40, 20, 30, 70}))), + }, + 4, }} script2 := []*metricsAdjusterTest{{ @@ -249,9 +290,10 @@ func Test_tsGC(t *testing.T) { mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t2Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{8, 7, 9, 14}))), }, []*metricspb.Metric{ - mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 44))), - mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{4, 5, 6, 7}))), + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t2Ms, 88))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t2Ms, bounds0, []int64{8, 7, 9, 14}))), }, + 0, }} script3 := []*metricsAdjusterTest{{ @@ -261,9 +303,10 @@ func Test_tsGC(t *testing.T) { mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t3Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{9, 8, 10, 15})), mtu.Timeseries(t3Ms, v10v20, mtu.DistPt(t3Ms, bounds0, []int64{55, 66, 33, 77}))), }, []*metricspb.Metric{ - mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t3Ms, 55))), - mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{5, 6, 7, 8}))), + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t3Ms, 99)), mtu.Timeseries(t3Ms, v10v20, mtu.Double(t3Ms, 80))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t3Ms, bounds0, []int64{9, 8, 10, 15})), mtu.Timeseries(t3Ms, v10v20, mtu.DistPt(t3Ms, bounds0, []int64{55, 66, 33, 77}))), }, + 2, }} jobsMap := NewJobsMap(time.Minute) @@ -287,13 +330,18 @@ func Test_jobGC(t *testing.T) { mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)), mtu.Timeseries(t1Ms, v10v20, mtu.Double(t1Ms, 20))), mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})), mtu.Timeseries(t1Ms, v10v20, mtu.DistPt(t1Ms, bounds0, []int64{40, 20, 30, 70}))), }, - []*metricspb.Metric{}, + []*metricspb.Metric{ + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.Double(t1Ms, 44)), mtu.Timeseries(t1Ms, v10v20, mtu.Double(t1Ms, 20))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t1Ms, v1v2, mtu.DistPt(t1Ms, bounds0, []int64{4, 2, 3, 7})), mtu.Timeseries(t1Ms, v10v20, mtu.DistPt(t1Ms, bounds0, []int64{40, 20, 30, 70}))), + }, + 4, }} job2Script1 := []*metricsAdjusterTest{{ "JobGC: job2, round 1 - no metrics adjusted, just trigger gc", []*metricspb.Metric{}, []*metricspb.Metric{}, + 0, }} job1Script2 := []*metricsAdjusterTest{{ @@ -302,7 +350,11 @@ func Test_jobGC(t *testing.T) { mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t4Ms, 99)), mtu.Timeseries(t4Ms, v10v20, mtu.Double(t4Ms, 80))), mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.DistPt(t4Ms, bounds0, []int64{9, 8, 10, 15})), mtu.Timeseries(t4Ms, v10v20, mtu.DistPt(t4Ms, bounds0, []int64{55, 66, 33, 77}))), }, - []*metricspb.Metric{}, + []*metricspb.Metric{ + mtu.Cumulative(c1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.Double(t4Ms, 99)), mtu.Timeseries(t4Ms, v10v20, mtu.Double(t4Ms, 80))), + mtu.CumulativeDist(cd1, k1k2, mtu.Timeseries(t4Ms, v1v2, mtu.DistPt(t4Ms, bounds0, []int64{9, 8, 10, 15})), mtu.Timeseries(t4Ms, v10v20, mtu.DistPt(t4Ms, bounds0, []int64{55, 66, 33, 77}))), + }, + 4, }} gcInterval := 10 * time.Millisecond @@ -349,19 +401,7 @@ type metricsAdjusterTest struct { description string metrics []*metricspb.Metric adjusted []*metricspb.Metric -} - -func (mat *metricsAdjusterTest) dropped() int { - metricsTimeseries := 0 - for _, metric := range mat.metrics { - metricsTimeseries += len(metric.GetTimeseries()) - } - - adjustedTimeseries := 0 - for _, adjusted := range mat.adjusted { - adjustedTimeseries += len(adjusted.GetTimeseries()) - } - return metricsTimeseries - adjustedTimeseries + reset int } func runScript(t *testing.T, tsm *timeseriesMap, script []*metricsAdjusterTest) { @@ -370,7 +410,7 @@ func runScript(t *testing.T, tsm *timeseriesMap, script []*metricsAdjusterTest) ma := NewMetricsAdjuster(tsm, l) for _, test := range script { - expectedDropped := test.dropped() + expectedDropped := test.reset adjusted, dropped := ma.AdjustMetrics(test.metrics) assert.EqualValuesf(t, test.adjusted, adjusted, "Test: %v - expected: %v, actual: %v", test.description, test.adjusted, adjusted) assert.Equalf(t, expectedDropped, dropped, "Test: %v", test.description) diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index 1270aab7667..f73d65cdd46 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -252,32 +252,138 @@ rpc_duration_seconds_count 1001 func verifyTarget1(t *testing.T, td *testData, mds []internaldata.MetricsData) { verifyNumScrapeResults(t, td, mds) m1 := mds[0] - // m1 shall only have a gauge - if l := len(m1.Metrics); l != 1 { + if l := len(m1.Metrics); l != 4 { t.Errorf("want 1, but got %v\n", l) } - // only gauge value is returned from the first scrape - wantG1 := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "go_threads", - Description: "Number of OS threads created", - Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, - }, - Timeseries: []*metricspb.TimeSeries{ + ts1 := m1.Metrics[0].Timeseries[0].Points[0].Timestamp + want1 := &internaldata.MetricsData{ + Node: td.node, + Resource: td.resource, + Metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "go_threads", + Description: "Number of OS threads created", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE}, + Timeseries: []*metricspb.TimeSeries{ + { + Points: []*metricspb.Point{ + {Timestamp: ts1, Value: &metricspb.Point_DoubleValue{DoubleValue: 19.0}}, + }, + }, + }, + }, { - Points: []*metricspb.Point{ - {Value: &metricspb.Point_DoubleValue{DoubleValue: 19.0}}, + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "http_requests_total", + Description: "The total number of HTTP requests.", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "code"}, {Key: "method"}}, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ts1, + LabelValues: []*metricspb.LabelValue{ + {Value: "200", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts1, Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}}, + }, + }, + { + StartTimestamp: ts1, + LabelValues: []*metricspb.LabelValue{ + {Value: "400", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts1, Value: &metricspb.Point_DoubleValue{DoubleValue: 5.0}}, + }, + }, + }, + }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "http_request_duration_seconds", + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + Description: "A histogram of the request duration.", + Unit: "s", + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ts1, + Points: []*metricspb.Point{ + { + Timestamp: ts1, + Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{0.05, 0.5, 1}, + }, + }, + }, + Count: 2500, + Sum: 5000.0, + Buckets: []*metricspb.DistributionValue_Bucket{ + {Count: 1000}, + {Count: 500}, + {Count: 500}, + {Count: 500}, + }, + }}, + }, + }, + }, + }, + }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "rpc_duration_seconds", + Type: metricspb.MetricDescriptor_SUMMARY, + Description: "A summary of the RPC duration in seconds.", + Unit: "s", + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ts1, + Points: []*metricspb.Point{ + { + Timestamp: ts1, + Value: &metricspb.Point_SummaryValue{ + SummaryValue: &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{Value: 5000}, + Count: &wrappers.Int64Value{Value: 1000}, + Snapshot: &metricspb.SummaryValue_Snapshot{ + PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ + { + Percentile: 1, + Value: 1, + }, + { + Percentile: 90, + Value: 5, + }, + { + Percentile: 99, + Value: 8, + }, + }, + }, + }, + }, + }, + }, + }, }, }, }, } - gotG1 := m1.Metrics[0] - // relying on the timestamps from gagues as startTimestamps - ts1 := gotG1.Timeseries[0].Points[0].Timestamp - // set this timestamp to wantG1 - wantG1.Timeseries[0].Points[0].Timestamp = ts1 - doCompare("scrape1", t, wantG1, gotG1) + + doCompare("scrape1", t, want1, &m1) // verify the 2nd metricData m2 := mds[1] @@ -315,7 +421,7 @@ func verifyTarget1(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 99.0}}, + {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 199.0}}, }, }, { @@ -325,7 +431,7 @@ func verifyTarget1(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 7.0}}, + {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 12.0}}, }, }, }, @@ -352,13 +458,13 @@ func verifyTarget1(t *testing.T, td *testData, mds []internaldata.MetricsData) { }, }, }, - Count: 100, - Sum: 50.0, + Count: 2600, + Sum: 5050.0, Buckets: []*metricspb.DistributionValue_Bucket{ - {Count: 100}, - {Count: 0}, - {Count: 0}, - {Count: 0}, + {Count: 1100}, + {Count: 500}, + {Count: 500}, + {Count: 500}, }, }}, }, @@ -381,8 +487,8 @@ func verifyTarget1(t *testing.T, td *testData, mds []internaldata.MetricsData) { Timestamp: ts2, Value: &metricspb.Point_SummaryValue{ SummaryValue: &metricspb.SummaryValue{ - Sum: &wrappers.DoubleValue{Value: 2}, - Count: &wrappers.Int64Value{Value: 1}, + Sum: &wrappers.DoubleValue{Value: 5002}, + Count: &wrappers.Int64Value{Value: 1001}, Snapshot: &metricspb.SummaryValue_Snapshot{ PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ { @@ -479,31 +585,63 @@ http_requests_total{method="post",code="500"} 5 func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { verifyNumScrapeResults(t, td, mds) m1 := mds[0] - // m1 shall only have a gauge - if l := len(m1.Metrics); l != 1 { + if l := len(m1.Metrics); l != 2 { t.Errorf("want 1, but got %v\n", l) } - // only gauge value is returned from the first scrape - wantG1 := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "go_threads", - Description: "Number of OS threads created", - Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, - }, - Timeseries: []*metricspb.TimeSeries{ + ts1 := m1.Metrics[0].Timeseries[0].Points[0].Timestamp + want1 := &internaldata.MetricsData{ + Node: td.node, + Resource: td.resource, + Metrics: []*metricspb.Metric{ { - Points: []*metricspb.Point{ - {Value: &metricspb.Point_DoubleValue{DoubleValue: 18.0}}, + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "go_threads", + Description: "Number of OS threads created", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + }, + Timeseries: []*metricspb.TimeSeries{ + { + Points: []*metricspb.Point{ + {Timestamp: ts1, Value: &metricspb.Point_DoubleValue{DoubleValue: 18.0}}, + }, + }, + }, + }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "http_requests_total", + Description: "The total number of HTTP requests.", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "code"}, {Key: "method"}}, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ts1, + LabelValues: []*metricspb.LabelValue{ + {Value: "200", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts1, Value: &metricspb.Point_DoubleValue{DoubleValue: 10.0}}, + }, + }, + { + StartTimestamp: ts1, + LabelValues: []*metricspb.LabelValue{ + {Value: "400", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts1, Value: &metricspb.Point_DoubleValue{DoubleValue: 50.0}}, + }, + }, }, }, }, } - gotG1 := m1.Metrics[0] - ts1 := gotG1.Timeseries[0].Points[0].Timestamp - // set this timestamp to wantG1 - wantG1.Timeseries[0].Points[0].Timestamp = ts1 - doCompare("scrape1", t, wantG1, gotG1) + + doCompare("scrape1", t, want1, &m1) // verify the 2nd metricData m2 := mds[1] @@ -542,7 +680,7 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 40.0}}, + {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 50.0}}, }, }, { @@ -552,7 +690,17 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 10.0}}, + {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 60.0}}, + }, + }, + { + StartTimestamp: ts2, + LabelValues: []*metricspb.LabelValue{ + {Value: "500", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts2, Value: &metricspb.Point_DoubleValue{DoubleValue: 3.0}}, }, }, }, @@ -599,7 +747,7 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts3, Value: &metricspb.Point_DoubleValue{DoubleValue: 40.0}}, + {Timestamp: ts3, Value: &metricspb.Point_DoubleValue{DoubleValue: 50.0}}, }, }, { @@ -609,7 +757,7 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts3, Value: &metricspb.Point_DoubleValue{DoubleValue: 10.0}}, + {Timestamp: ts3, Value: &metricspb.Point_DoubleValue{DoubleValue: 60.0}}, }, }, { @@ -619,7 +767,7 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts3, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + {Timestamp: ts3, Value: &metricspb.Point_DoubleValue{DoubleValue: 5.0}}, }, }, }, @@ -650,6 +798,46 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { }, }, }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "http_requests_total", + Description: "The total number of HTTP requests.", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "code"}, {Key: "method"}}, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ts4, + LabelValues: []*metricspb.LabelValue{ + {Value: "200", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts4, Value: &metricspb.Point_DoubleValue{DoubleValue: 49.0}}, + }, + }, + { + StartTimestamp: ts4, + LabelValues: []*metricspb.LabelValue{ + {Value: "400", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts4, Value: &metricspb.Point_DoubleValue{DoubleValue: 59.0}}, + }, + }, + { + StartTimestamp: ts4, + LabelValues: []*metricspb.LabelValue{ + {Value: "500", HasValue: true}, + {Value: "post", HasValue: true}, + }, + Points: []*metricspb.Point{ + {Timestamp: ts4, Value: &metricspb.Point_DoubleValue{DoubleValue: 3.0}}, + }, + }, + }, + }, }, } doCompare("scrape4", t, want4, &m4) @@ -692,7 +880,7 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts5, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + {Timestamp: ts5, Value: &metricspb.Point_DoubleValue{DoubleValue: 50.0}}, }, }, { @@ -702,7 +890,7 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts5, Value: &metricspb.Point_DoubleValue{DoubleValue: 0.0}}, + {Timestamp: ts5, Value: &metricspb.Point_DoubleValue{DoubleValue: 59.0}}, }, }, { @@ -712,7 +900,7 @@ func verifyTarget2(t *testing.T, td *testData, mds []internaldata.MetricsData) { {Value: "post", HasValue: true}, }, Points: []*metricspb.Point{ - {Timestamp: ts5, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + {Timestamp: ts5, Value: &metricspb.Point_DoubleValue{DoubleValue: 5.0}}, }, }, }, @@ -797,30 +985,138 @@ rpc_duration_seconds_count{foo="no_quantile"} 55 func verifyTarget3(t *testing.T, td *testData, mds []internaldata.MetricsData) { verifyNumScrapeResults(t, td, mds) m1 := mds[0] - // m1 shall only have a gauge - if l := len(m1.Metrics); l != 1 { + if l := len(m1.Metrics); l != 3 { t.Errorf("want 1, but got %v\n", l) } - // only gauge value is returned from the first scrape - wantG1 := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "go_threads", - Description: "Number of OS threads created", - Type: metricspb.MetricDescriptor_GAUGE_DOUBLE}, - Timeseries: []*metricspb.TimeSeries{ + ts1 := m1.Metrics[1].Timeseries[0].Points[0].Timestamp + want1 := &internaldata.MetricsData{ + Node: td.node, + Resource: td.resource, + Metrics: []*metricspb.Metric{ { - Points: []*metricspb.Point{ - {Value: &metricspb.Point_DoubleValue{DoubleValue: 18.0}}, + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "go_threads", + Description: "Number of OS threads created", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + }, + Timeseries: []*metricspb.TimeSeries{ + { + Points: []*metricspb.Point{ + {Timestamp: ts1, Value: &metricspb.Point_DoubleValue{DoubleValue: 18.0}}, + }, + }, + }, + }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "http_request_duration_seconds", + Description: "A histogram of the request duration.", + Unit: "s", + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ts1, + Points: []*metricspb.Point{ + { + Timestamp: ts1, + Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{0.2, 0.5, 1}, + }, + }, + }, + Count: 13003, + Sum: 50000, + Buckets: []*metricspb.DistributionValue_Bucket{ + {Count: 10000}, + {Count: 1000}, + {Count: 1001}, + {Count: 1002}, + }, + }, + }, + }, + }, + }, + }, + }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "rpc_duration_seconds", + Type: metricspb.MetricDescriptor_SUMMARY, + LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}, + Description: "A summary of the RPC duration in seconds.", + Unit: "s", + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ts1, + LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, + Points: []*metricspb.Point{ + { + Timestamp: ts1, + Value: &metricspb.Point_SummaryValue{ + SummaryValue: &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{Value: 8000}, + Count: &wrappers.Int64Value{Value: 900}, + Snapshot: &metricspb.SummaryValue_Snapshot{ + PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ + { + Percentile: 1, + Value: 31, + }, + { + Percentile: 5, + Value: 35, + }, + { + Percentile: 50, + Value: 47, + }, + { + Percentile: 90, + Value: 70, + }, + { + Percentile: 99, + Value: 76, + }, + }, + }, + }, + }, + }, + }, + }, + { + StartTimestamp: ts1, + LabelValues: []*metricspb.LabelValue{{Value: "no_quantile", HasValue: true}}, + Points: []*metricspb.Point{ + { + Timestamp: ts1, + Value: &metricspb.Point_SummaryValue{ + SummaryValue: &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{Value: 100}, + Count: &wrappers.Int64Value{Value: 50}, + Snapshot: &metricspb.SummaryValue_Snapshot{ + PercentileValues: nil, + }, + }, + }, + }, + }, + }, }, }, }, } - gotG1 := m1.Metrics[0] - ts1 := gotG1.Timeseries[0].Points[0].Timestamp - // set this timestamp to wantG1 - wantG1.Timeseries[0].Points[0].Timestamp = ts1 - doCompare("scrape1", t, wantG1, gotG1) + + doCompare("scrape1", t, want1, &m1) // verify the 2nd metricData m2 := mds[1] @@ -866,13 +1162,13 @@ func verifyTarget3(t *testing.T, td *testData, mds []internaldata.MetricsData) { }, }, }, - Count: 1000, - Sum: 100, + Count: 14003, + Sum: 50100, Buckets: []*metricspb.DistributionValue_Bucket{ + {Count: 11000}, {Count: 1000}, - {Count: 0}, - {Count: 0}, - {Count: 0}, + {Count: 1001}, + {Count: 1002}, }, }, }, @@ -898,8 +1194,8 @@ func verifyTarget3(t *testing.T, td *testData, mds []internaldata.MetricsData) { Timestamp: ts2, Value: &metricspb.Point_SummaryValue{ SummaryValue: &metricspb.SummaryValue{ - Sum: &wrappers.DoubleValue{Value: 100}, - Count: &wrappers.Int64Value{Value: 50}, + Sum: &wrappers.DoubleValue{Value: 8100}, + Count: &wrappers.Int64Value{Value: 950}, Snapshot: &metricspb.SummaryValue_Snapshot{ PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ { @@ -937,8 +1233,8 @@ func verifyTarget3(t *testing.T, td *testData, mds []internaldata.MetricsData) { Timestamp: ts2, Value: &metricspb.Point_SummaryValue{ SummaryValue: &metricspb.SummaryValue{ - Sum: &wrappers.DoubleValue{Value: 1}, - Count: &wrappers.Int64Value{Value: 5}, + Sum: &wrappers.DoubleValue{Value: 101}, + Count: &wrappers.Int64Value{Value: 55}, Snapshot: &metricspb.SummaryValue_Snapshot{ PercentileValues: nil, }, @@ -1092,7 +1388,9 @@ func testEndToEnd(t *testing.T, targets []*testData, useStartTimeMetric bool) { // loop to validate outputs for each targets for _, target := range targets { - target.validateFunc(t, target, results[target.name]) + t.Run(target.name, func(t *testing.T) { + target.validateFunc(t, target, results[target.name]) + }) } } From c0f7d1d1264752bd60404007eb128c7ae1ac5b37 Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Wed, 28 Apr 2021 17:17:32 -0400 Subject: [PATCH 2/5] receiver/prometheus: update documentation regarding metric adjustment Signed-off-by: Anthony J Mirabella --- receiver/prometheusreceiver/DESIGN.md | 266 +++++++++++++----- .../internal/metrics_adjuster.go | 12 +- 2 files changed, 190 insertions(+), 88 deletions(-) diff --git a/receiver/prometheusreceiver/DESIGN.md b/receiver/prometheusreceiver/DESIGN.md index 9b04f60bc91..2afedecba8a 100644 --- a/receiver/prometheusreceiver/DESIGN.md +++ b/receiver/prometheusreceiver/DESIGN.md @@ -32,11 +32,11 @@ service. We shall be able to retain parity from the following two setups: ## Prometheus Text Format Overview -Prometheus text format is a line orient format. For each non-empty line, which -not begins with #, is a metric data point with includes a metric name and its +Prometheus text format is a line orient format. Each non-empty line, which +does not begin with #, is a metric data point with includes a metric name and its value, which is of float64 type, as well as some optional data such as tags and -timestamp, which is in milliseconds. For lines begin with #, they are either -comments, which need to be filtered, or metadata, which including type hints +timestamp, which is in milliseconds. For lines that begin with #, they are either +comments, which need to be filtered, or metadata, including type hints and units that are usually indicating the beginning of a new individual metric or a group of new metrics. More details of Prometheus text format can be found from its [official @@ -78,10 +78,10 @@ container_cpu_load_average_10s{id="/000-metadata",image="",name=""} 0 container_cpu_load_average_10s{id="/001-sysfs",image="",name=""} 0 ``` -The above example was taken from an cadvisor metric endpoint, the type hint +The above example was taken from a cadvisor metric endpoint, the type hint tells that the name of this metric group is `container_cpu_load_average_10s` -and it's of `gauge` type. Then it follows by some individual metric points -which are of the same metric name. For each individual metric within this +and it's of `gauge` type. Then it is followed by some individual metric points +which have the same metric name. For each individual metric within this group, they share the same set of tag keys, with unique value sets. ## Prometheus Metric Scraper Anatomy @@ -94,10 +94,10 @@ receiver properly. ### Major components of Prometheus Scape package - **[ScapeManager](https://github.com/prometheus/prometheus/blob/v2.9.2/scrape/manager.go):** -the component which loads the scrape_config, and manage the scraping tasks +the component which loads the scrape_config, and manages the scraping tasks - **[ScrapePool](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/scrape/scrape.go#L154-L439):** -an object which manage scrapes for a sets of targets +an object which manages scrapes for a sets of targets - **[Scraper](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/scrape/scrape.go#L506-L511):** a http client to fetch data from remote metrics endpoints @@ -112,8 +112,8 @@ a DFA style streaming decoder/parser for prometheus text format it is used to acquire a storage appender instance at the beginning of each scrapeLoop run - **[storage.Appender](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/storage/interface.go#L86-L95):** -an abstraction of the metric storage which can be a filesystem, a database or an remote endpoint...etc. As for OpenTelemetry prometheus receiver, this is -also the interface we need to implement to provide a customized storage appender which is backed by metrics sink. +an abstraction of the metric storage which can be a filesystem, a database or a remote endpoint...etc. For the OpenTelemetry prometheus receiver, this is +also the interface we need to implement to provide a customized storage appender backed by a metrics sink. - **[ScrapeLoop](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/scrape/scrape.go#L586-L1024):** the actual scrape pipeline which performs the main scraping and ingestion logic. @@ -137,8 +137,9 @@ It basically does the following things in turn: ### The storage.Appender interface As discussed in the previous section, the storage.Appender is the most -important piece of components for us to implement to bring the two worlds +important component for us to implement to bring the two worlds together. It has a very simple interface which is defined below: + ```go type Appender interface { Add(l labels.Labels, t int64, v float64) (uint64, error) @@ -160,40 +161,39 @@ type Appender interface { One can see that the interface is very simple, it only has 4 methods: `Add`, `AddFast`, `Commit` and `Rollback`. The last two methods are easy to understand: `Commit` is called when the processing of the scraped page is -completed and success, whereas `Rollback` is called if error occurs in between +completed and successful, whereas `Rollback` is called if an error occurs during the process. -However for the two methods starting with 'Add', there's no document on the +However, for the two methods starting with 'Add', there's no document on the Prometheus project for how they should be used. By examining the scrapeLoop source code, as well as some storage.Appender implementations. It indicates that the first method `Add` is always used for the first time when a unique -metrics, which means the combination of metric name and its tags are unique, is -seen for the first time. The `Add` method can return a non-zero reference -number, then the scrapeLoop can cache this number with the metric's unique -signature. The next time, such as the next scrape cycle of the same target, -when the metric is seen again by matching its signature, it will call the -`AddFast` method with the cached reference number. This reference number might -make sense to databases which has unique key as numbers, however, in our use -case, it's not necessary, thus we can always return 0 ref number from the `Add` -method to skip this caching mechanism. +combination of metric name and tags are seen for the first time. The `Add` +method can return a non-zero reference number, then the scrapeLoop can cache +this number with the metric's signature. The next time, such as the next +scrape cycle of the same target, when the metric is seen again by matching +its signature, it will call the `AddFast` method with the cached reference +number. This reference number might make sense to databases which have unique +key as numbers, however, in our use case, it's not necessary, thus we can +always return 0 ref number from the `Add` method to skip this caching mechanism. ### Challenges and solutions -Even though the definition of this interface is very simple, however, to -implement it properly is a bit challenging, given that every time the -Add/AddFast method is called, it only provides the information about the -current data point, the context of what metric group this data point belonging -to is not provided, we have to keep track of it internally within the appender. -And this is not the whole story, there are a couple other issues we need to +Even though the definition of this interface is very simple, to +implement it properly is a bit challenging given that every time the +Add/AddFast method is called it only provides the information about the +current data point. The context of what metric group this data point belonging +to is not provided; we have to keep track of it internally within the appender. +This is not the whole story, there are a couple other issues we need to address, including: 1. Have a way to link the Target with the current appender instance -The labels provided to the Add/AddFast methods dose not include some target -specified information such as `job name` which is important construct the [Node +The labels provided to the Add/AddFast methods do not include some target +specified information such as `job name` which is important in constructing the [Node proto](https://github.com/census-instrumentation/opencensus-proto/blob/e2601ef16f8a085a69d94ace5133f97438f8945f/src/opencensus/proto/agent/common/v1/common.proto#L36-L51) object of OpenTelemetry. The target object is not accessible from the Appender -interface, however, we can get it from the ScrapeManager, when designing the -appender, we need to have a way to inject the binding target into the appender +interface, however, we can get it from the ScrapeManager, so when designing the +appender we need to have a way to inject the binding target into the appender instance. 2. Group metrics from the same family together @@ -207,13 +207,13 @@ appender itself need to keep track of it. It's also important to know that for some special types such as `histogram` and `summary`, not all the data points have the same name, there are some special metric points has postfix like `_sum` and `_count`, we need to handle this properly, and do not consider this -is a metric family change. +as a metric family change. 3. Group complex metrics such as histogram together in proper order -In Prometheus, a single aggregated type of metric data such as `histogram` and +In Prometheus a single aggregated type of metric data such as `histogram` and `summary` is represented by multiple metric data points, such as buckets and -quantiles as well as the additional `_sum` and `_count` data. ScrapeLoop will +quantiles, as well as the additional `_sum` and `_count` data. ScrapeLoop will feed them into the appender individually. The appender needs to have a way to bundle them together to transform them into a single Metric Datapoint Proto object. @@ -226,24 +226,23 @@ of the same metric family before committing the metric family to the sink. 5. StartTimestamp and values of metrics of cumulative types -In OpenTelemetry, every metrics of cumulative type is required to have a -StartTimestamp, which records when a metric is first recorded, however, +In OpenTelemetry, every metric of cumulative type is required to have a +StartTimestamp, which records when a metric is first recorded. However, Prometheus does not provide such data. One of the solutions to tackle this problem is to cache the first observed value of these metrics as well as the timestamp, then for any subsequent data of the same metric, use the cached -timestamp as StartTimestamp and the delta with the first value as value. -However, metrics can come and go, or the remote server can restart at any given -time, the receiver also needs to take care of issues such as a new value is -smaller than the previous seen value, by considering it as a metrics with new -StartTime. +timestamp as StartTimestamp. Unfortunately, metrics can come and go, or the +remote server can restart at any given time, so the receiver also needs to +take care of issues such as when a new value is smaller than the previously +seen value, by considering it as a metric with a new StartTime. ## Prometheus Metric to OpenTelemetry Metric Proto Mapping ### Target as Node -The Target of Prometheus is defined by the scrape_config, it has the -information like `hostname` of the remote service, and a user defined `job -name` which can be used as the service name. These two piece of information -makes it a great fit to map it into the `Node` proto of the OpenTelemetry +The Target of Prometheus is defined by the scrape_config, it has information +such as the `hostname` of the remote service, and a user defined `job +name` that can be used as the service name. These two pieces of information +make it a great fit to map to the `Node` field of the OpenTelemetry MetricsData type, as shown below: ```go @@ -257,13 +256,13 @@ type MetricsData struct { The scrape page as a whole also can be fit into the above `MetricsData` data structure, and all the metrics data points can be stored with the `Metrics` array. We will explain the mappings of individual metric types in the following -couple sections +sections ### Metric Value Mapping - In OpenTelemetry, metrics value types can be either `int64` or `float64`, - while in Prometheus the value can be safely assumed it's always `float64` + In OpenTelemetry metrics value types can be either `int64` or `float64`, + while in Prometheus the value can be safely assumed to always be `float64` based on the [Prometheus Text Format - Document](https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details) + Document](https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details), as quoted below: > value is a float represented as required by Go's ParseFloat() function. @@ -281,7 +280,7 @@ Document](https://prometheus.io/docs/concepts/metric_types/#counter), > is a cumulative metric that represents a single monotonically increasing > counter whose value can only increase or be reset to zero on restart. -It is one of simplest metric types found in both systems, however, it is +It is one of the simplest metric types found in both systems, however, it is a cumulative type of metric. Consider what happens when we have two consecutive scrapes from a target, with the first one as shown below: ``` @@ -299,11 +298,10 @@ http_requests_total{method="post",code="200"} 1028 http_requests_total{method="post",code="400"} 5 ``` -The Prometheus Receiver will only produce one Metric from the 2nd scrape and -subsequent ones if any. The 1st scrape, however, is stored as metadata to -calculate a delta from. +The Prometheus Receiver stores previously seen scrape data as metadata to +attempt to identify value resets and to provide a start time for produced metrics. -The output of the 2nd scrape is as shown below: +The output of the first scrape is as shown below: ```go metrics := []*metricspb.Metric{ { @@ -316,14 +314,42 @@ metrics := []*metricspb.Metric{ StartTimestamp: startTimestamp, LabelValues: []*metricspb.LabelValue{{Value: "post", HasValue: true}, {Value: "200", HasValue: true}}, Points: []*metricspb.Point{ - {Timestamp: currentTimestamp, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + {Timestamp: startTimestamp, Value: &metricspb.Point_DoubleValue{DoubleValue: 1027.0}}, }, }, { StartTimestamp: startTimestamp, LabelValues: []*metricspb.LabelValue{{Value: "post", HasValue: false}, {Value: "400", HasValue: true}}, Points: []*metricspb.Point{ - {Timestamp: currentTimestamp, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + {Timestamp: startTimestamp, Value: &metricspb.Point_DoubleValue{DoubleValue: 3.0}}, + }, + }, + }, + }, +} +``` + +The output of the second scrape is as shown below: +```go +metrics := []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "http_requests_total", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "method"}, {Key: "code"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: startTimestamp, + LabelValues: []*metricspb.LabelValue{{Value: "post", HasValue: true}, {Value: "200", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: currentTimestamp, Value: &metricspb.Point_DoubleValue{DoubleValue: 1028.0}}, + }, + }, + { + StartTimestamp: startTimestamp, + LabelValues: []*metricspb.LabelValue{{Value: "post", HasValue: false}, {Value: "400", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: currentTimestamp, Value: &metricspb.Point_DoubleValue{DoubleValue: 5.0}}, }, }, }, @@ -385,9 +411,9 @@ Histogram is a complex data type, in Prometheus, it uses multiple data points to represent a single histogram. Its description can be found from: [Prometheus Histogram](https://prometheus.io/docs/concepts/metric_types/#histogram). -Similar to counter, histogram is also a cumulative type metric, thus only the -2nd and subsequent scrapes can produce a metric for OpenTelemetry, with the -first scrape stored as metadata. +Similar to counter, histogram is also a cumulative type metric, so the receiver +will store metadata that can be used to detect resets and provide an appropriate +start timestamp for subsequent metrics. An example of histogram with first scrape response: ``` @@ -406,6 +432,58 @@ hist_test_count{t1="2"} 100.0 ``` +Its corresponding OpenTelemetry metrics will be: +```go +metrics := []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "hist_test", + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: startTimestamp, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: startTimestamp, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 10, + Sum: 100.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 1}, {Count: 2}, {Count: 7}}, + }}}, + }, + }, + { + StartTimestamp: startTimestamp, + LabelValues: []*metricspb.LabelValue{{Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: startTimestamp, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 100, + Sum: 10000.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 10}, {Count: 20}, {Count: 70}}, + }}}, + }, + }, + }, + }, +} +``` + And a subsequent 2nd scrape response: ``` # HELP hist_test This is my histogram vec @@ -445,9 +523,9 @@ metrics := []*metricspb.Metric{ }, }, }, - Count: 3, - Sum: 50.0, - Buckets: []*metricspb.DistributionValue_Bucket{{Count: 1}, {Count: 2}, {Count: 0}}, + Count: 13, + Sum: 150.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 2}, {Count: 4}, {Count: 7}}, }}}, }, }, @@ -464,9 +542,9 @@ metrics := []*metricspb.Metric{ }, }, }, - Count: 0, - Sum: 0.0, - Buckets: []*metricspb.DistributionValue_Bucket{{Count: 0}, {Count: 0}, {Count: 0}}, + Count: 100, + Sum: 10000.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 10}, {Count: 20}, {Count: 70}}, }}}, }, }, @@ -484,8 +562,8 @@ into OpenTelemetry format, one needs to apply the following formula: CurrentOCBucketVlaue = CurrentPrometheusBucketValue - PrevPrometheusBucketValue ``` -OpenTelemetry does not use `+inf` as bound, one needs to remove it to generate -the Bounds of the OpenTelemetry Bounds. +OpenTelemetry does not use `+inf` as an explicit bound, one needs to remove it to generate +the Bounds of the OpenTelemetry distribution. Other than that, the `SumOfSquaredDeviation`, which is required by OpenTelemetry format for histogram, is not provided by Prometheus. We have to @@ -501,10 +579,9 @@ Same as histogram, summary is also a complex metric type which is represented by multiple data points. A detailed description can be found from [Prometheus Summary](https://prometheus.io/docs/concepts/metric_types/#summary) -The sum and count from Summary is also cumulative, however, the quantiles are -not. The receiver will still consider the first scrape as metadata, and won't -produce an output. For any subsequent scrapes, the count and sum will be deltas -from the first scrape, while the quantiles are left as it is. +The sum and count from Summary are cumulative, however, the quantiles are +not. The receiver will again maintain some state to attempt to detect value resets +and to set appropriate start timestamps. For the following two scrapes, with the first one: @@ -520,6 +597,40 @@ go_gc_duration_seconds_sum 17.391350544 go_gc_duration_seconds_count 52489 ``` +Its corresponding OpenTelemetry metrics will be: +```go +metrics := []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "go_gc_duration_seconds", + Type: metricspb.MetricDescriptor_SUMMARY, + LabelKeys: []*metricspb.LabelKey{}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: startTimestamp, + LabelValues: []*metricspb.LabelValue{}, + Points: []*metricspb.Point{ + {Timestamp: startTimestamp, Value: &metricspb.Point_SummaryValue{ + SummaryValue: &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{Value: 17.391350544}, + Count: &wrappers.Int64Value{Value: 52489}, + Snapshot: &metricspb.SummaryValue_Snapshot{ + PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ + {Percentile: 0.0, Value: 0.0001271}, + {Percentile: 25.0, Value: 0.0002455}, + {Percentile: 50.0, Value: 0.0002904}, + {Percentile: 75.0, Value: 0.0003426}, + {Percentile: 100.0, Value: 0.0023638}, + }, + }}}}, + }, + }, + }, + }, +} + +``` + And the 2nd one: ``` # HELP go_gc_duration_seconds A summary of the GC invocation durations. @@ -533,8 +644,7 @@ go_gc_duration_seconds_sum 17.491350544 go_gc_duration_seconds_count 52490 ``` -The corresponding OpenTelemetry metrics is as shown below: - +Its corresponding OpenTelemetry metrics will be: ```go metrics := []*metricspb.Metric{ { @@ -549,8 +659,8 @@ metrics := []*metricspb.Metric{ Points: []*metricspb.Point{ {Timestamp: currentTimestamp, Value: &metricspb.Point_SummaryValue{ SummaryValue: &metricspb.SummaryValue{ - Sum: &wrappers.DoubleValue{Value: 0.1}, - Count: &wrappers.Int64Value{Value: 1}, + Sum: &wrappers.DoubleValue{Value: 17.491350544}, + Count: &wrappers.Int64Value{Value: 52490}, Snapshot: &metricspb.SummaryValue_Snapshot{ PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ {Percentile: 0.0, Value: 0.0001271}, @@ -568,7 +678,7 @@ metrics := []*metricspb.Metric{ ``` -There's also some differences between the two systems. One difference is that +There are also some differences between the two systems. One difference is that Prometheus uses `quantile`, while OpenTelemetry uses `percentile`. Additionally, OpenTelemetry has optional values for `Sum` and `Count` of a snapshot, however, they are not provided by Prometheus, and `nil` will be used diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index 90e3b7c6d83..8f03cc04f15 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -185,7 +185,7 @@ func (jm *JobsMap) get(job, instance string) *timeseriesMap { } // MetricsAdjuster takes a map from a metric instance to the initial point in the metrics instance -// and provides AdjustMetrics, which takes a sequence of metrics and adjust their values based on +// and provides AdjustMetrics, which takes a sequence of metrics and adjust their start times based on // the initial points. type MetricsAdjuster struct { tsm *timeseriesMap @@ -200,7 +200,7 @@ func NewMetricsAdjuster(tsm *timeseriesMap, logger *zap.Logger) *MetricsAdjuster } } -// AdjustMetrics takes a sequence of metrics and adjust their values based on the initial and +// AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and // previous points in the timeseriesMap. // Returns the total number of timeseries that had reset start times. func (ma *MetricsAdjuster) AdjustMetrics(metrics []*metricspb.Metric) ([]*metricspb.Metric, int) { @@ -274,13 +274,6 @@ func (ma *MetricsAdjuster) adjustPoints(metricType metricspb.MetricDescriptor_Ty return ma.adjustPoint(metricType, current[0], previous[0]) } -// Note: There is an important, subtle point here. When a new timeseries or a reset is detected, -// current and initial are the same object. When initial == previous, the previous value/count/sum -// are all the initial value. When initial != previous, the previous value/count/sum has been -// adjusted wrt the initial value so both they must be combined to find the actual previous -// value/count/sum. This happens because the timeseries are updated in-place - if new copies of the -// timeseries were created instead, previous could be used directly but this would mean reallocating -// all of the metrics. func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Type, current, previous *metricspb.Point) bool { switch metricType { @@ -298,7 +291,6 @@ func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Typ return false } case metricspb.MetricDescriptor_SUMMARY: - // note: for summary, we don't adjust the snapshot currentSummary := current.GetSummaryValue() previousSummary := previous.GetSummaryValue() if currentSummary.Count.GetValue() < previousSummary.Count.GetValue() || currentSummary.Sum.GetValue() < previousSummary.Sum.GetValue() { From da6f5f3afb7e63af6ac27819cec243d0fdf8e88d Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Wed, 28 Apr 2021 17:31:09 -0400 Subject: [PATCH 3/5] exporter/prometheus: fix e2e test to account for proper recevier functioning --- exporter/prometheusexporter/end_to_end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/prometheusexporter/end_to_end_test.go b/exporter/prometheusexporter/end_to_end_test.go index c0f0ed73fa4..16a086d4d86 100644 --- a/exporter/prometheusexporter/end_to_end_test.go +++ b/exporter/prometheusexporter/end_to_end_test.go @@ -144,7 +144,7 @@ func TestEndToEndSummarySupport(t *testing.T) { `test_jvm_gc_collection_seconds_sum.gc="G1 Old Generation". 0.*`, `test_jvm_gc_collection_seconds_count.gc="G1 Old Generation". 0.*`, `test_jvm_gc_collection_seconds_sum.gc="G1 Young Generation". 0.*`, - `test_jvm_gc_collection_seconds_count.gc="G1 Young Generation". 0.*`, + `test_jvm_gc_collection_seconds_count.gc="G1 Young Generation". 9.*`, `. HELP test_jvm_info JVM version info`, `. TYPE test_jvm_info gauge`, `test_jvm_info.vendor="Oracle Corporation",version="9.0.4.11". 1.*`, From 18dd2979ccdb0a83ba1ba8d858b128eff8491ef9 Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Mon, 10 May 2021 11:52:40 -0400 Subject: [PATCH 4/5] Addres PR feedback on docs and naming Signed-off-by: Anthony J Mirabella --- receiver/prometheusreceiver/DESIGN.md | 55 +++++++++---------- .../internal/metrics_adjuster.go | 12 ++-- .../internal/metrics_adjuster_test.go | 8 +-- 3 files changed, 36 insertions(+), 39 deletions(-) diff --git a/receiver/prometheusreceiver/DESIGN.md b/receiver/prometheusreceiver/DESIGN.md index 2afedecba8a..a0e9a2c31b5 100644 --- a/receiver/prometheusreceiver/DESIGN.md +++ b/receiver/prometheusreceiver/DESIGN.md @@ -142,45 +142,42 @@ together. It has a very simple interface which is defined below: ```go type Appender interface { - Add(l labels.Labels, t int64, v float64) (uint64, error) - - - AddFast(l labels.Labels, ref uint64, t int64, v float64) error - + Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) // Commit submits the collected samples and purges the batch. Commit() error - Rollback() error + + ExemplarAppender +} + +type ExemplarAppender interface { + AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) } ``` -*Note: the above code belongs to the Prometheus project, its license can be found [here](https://github.com/prometheus/prometheus/blob/v2.9.2/LICENSE)* - -One can see that the interface is very simple, it only has 4 methods: `Add`, -`AddFast`, `Commit` and `Rollback`. The last two methods are easy to -understand: `Commit` is called when the processing of the scraped page is -completed and successful, whereas `Rollback` is called if an error occurs during -the process. - -However, for the two methods starting with 'Add', there's no document on the -Prometheus project for how they should be used. By examining the scrapeLoop -source code, as well as some storage.Appender implementations. It indicates -that the first method `Add` is always used for the first time when a unique -combination of metric name and tags are seen for the first time. The `Add` -method can return a non-zero reference number, then the scrapeLoop can cache -this number with the metric's signature. The next time, such as the next -scrape cycle of the same target, when the metric is seen again by matching -its signature, it will call the `AddFast` method with the cached reference -number. This reference number might make sense to databases which have unique -key as numbers, however, in our use case, it's not necessary, thus we can -always return 0 ref number from the `Add` method to skip this caching mechanism. +*Note: the above code belongs to the Prometheus project, its license can be found [here](https://github.com/prometheus/prometheus/blob/v2.26.0/LICENSE)* + +One can see that the interface is very simple, it only has 4 methods (once we +account for the embedded `ExemplarAppender` interface): `Append`, `AppendExemplar`, +`Commit` and `Rollback`. The two lifecycle methods are easy to understand: `Commit` +is called when the processing of the scraped page is completed and successful, +whereas `Rollback` is called if an error occurs during the process. + +However, for the two methods starting with 'Append', the behavior is somewhat +more complicated. The documentation indicates that calls to 'Append' may return +an optional 'reference number' which may be used to add further samples in the +same or later transactions. A reference value of `0` is used to indicate that +no such caching should occur. The documentation indicates that current implementations +of `AppendExemplar` do not generate reference numbers and their doing so should +be considered erroneous and logged. In our system we do not generate any reference +numbers and always return `0` from `Append` and `AppendExemplar` to skip caching. ### Challenges and solutions Even though the definition of this interface is very simple, to implement it properly is a bit challenging given that every time the -Add/AddFast method is called it only provides the information about the +Append/AppendExemplar method is called it only provides the information about the current data point. The context of what metric group this data point belonging to is not provided; we have to keep track of it internally within the appender. This is not the whole story, there are a couple other issues we need to @@ -188,7 +185,7 @@ address, including: 1. Have a way to link the Target with the current appender instance -The labels provided to the Add/AddFast methods do not include some target +The labels provided to the Append/AppendExemplar methods do not include some target specified information such as `job name` which is important in constructing the [Node proto](https://github.com/census-instrumentation/opencensus-proto/blob/e2601ef16f8a085a69d94ace5133f97438f8945f/src/opencensus/proto/agent/common/v1/common.proto#L36-L51) object of OpenTelemetry. The target object is not accessible from the Appender @@ -201,7 +198,7 @@ instance. In OpenTelemetry, metric points of the same name are usually grouped together as one timeseries but different data points. It's important for the appender to keep track of the metric family changes, and group metrics of the same family -together Keep in mind that the Add/AddFast method is operated in a streaming +together Keep in mind that the Append/AppendExemplar method is operated in a streaming manner, ScrapeLoop does not provide any direct hints on metric name change, the appender itself need to keep track of it. It's also important to know that for some special types such as `histogram` and `summary`, not all the data points diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index 8f03cc04f15..0f536da146c 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -205,15 +205,15 @@ func NewMetricsAdjuster(tsm *timeseriesMap, logger *zap.Logger) *MetricsAdjuster // Returns the total number of timeseries that had reset start times. func (ma *MetricsAdjuster) AdjustMetrics(metrics []*metricspb.Metric) ([]*metricspb.Metric, int) { var adjusted = make([]*metricspb.Metric, 0, len(metrics)) - reset := 0 + resets := 0 ma.tsm.Lock() defer ma.tsm.Unlock() for _, metric := range metrics { d := ma.adjustMetric(metric) - reset += d + resets += d adjusted = append(adjusted, metric) } - return adjusted, reset + return adjusted, resets } // Returns the number of timeseries with reset start times. @@ -236,20 +236,20 @@ func (ma *MetricsAdjuster) adjustMetric(metric *metricspb.Metric) int { // Returns the number of timeseries that had reset start times. func (ma *MetricsAdjuster) adjustMetricTimeseries(metric *metricspb.Metric) int { - reset := 0 + resets := 0 filtered := make([]*metricspb.TimeSeries, 0, len(metric.GetTimeseries())) for _, current := range metric.GetTimeseries() { tsi := ma.tsm.get(metric, current.GetLabelValues()) if tsi.initial == nil || !ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial, tsi.previous) { // initial || reset timeseries tsi.initial = current - reset++ + resets++ } tsi.previous = current filtered = append(filtered, current) } metric.Timeseries = filtered - return reset + return resets } // Returns true if 'current' was adjusted and false if 'current' is an the initial occurrence or a diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go index 277f61be251..48f397e98bd 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go @@ -401,7 +401,7 @@ type metricsAdjusterTest struct { description string metrics []*metricspb.Metric adjusted []*metricspb.Metric - reset int + resets int } func runScript(t *testing.T, tsm *timeseriesMap, script []*metricsAdjusterTest) { @@ -410,9 +410,9 @@ func runScript(t *testing.T, tsm *timeseriesMap, script []*metricsAdjusterTest) ma := NewMetricsAdjuster(tsm, l) for _, test := range script { - expectedDropped := test.reset - adjusted, dropped := ma.AdjustMetrics(test.metrics) + expectedResets := test.resets + adjusted, resets := ma.AdjustMetrics(test.metrics) assert.EqualValuesf(t, test.adjusted, adjusted, "Test: %v - expected: %v, actual: %v", test.description, test.adjusted, adjusted) - assert.Equalf(t, expectedDropped, dropped, "Test: %v", test.description) + assert.Equalf(t, expectedResets, resets, "Test: %v", test.description) } } From 68c3fb4073af837f00e5651410fb3ed8381323ec Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Wed, 12 May 2021 14:58:30 -0400 Subject: [PATCH 5/5] rename adjustPoint->isReset Signed-off-by: Anthony J Mirabella --- receiver/prometheusreceiver/internal/metrics_adjuster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index 0f536da146c..ece858a9e8b 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -271,10 +271,10 @@ func (ma *MetricsAdjuster) adjustPoints(metricType metricspb.MetricDescriptor_Ty zap.Int("len(current)", len(current)), zap.Int("len(initial)", len(initial)), zap.Int("len(previous)", len(previous))) return true } - return ma.adjustPoint(metricType, current[0], previous[0]) + return ma.isReset(metricType, current[0], previous[0]) } -func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Type, +func (ma *MetricsAdjuster) isReset(metricType metricspb.MetricDescriptor_Type, current, previous *metricspb.Point) bool { switch metricType { case metricspb.MetricDescriptor_CUMULATIVE_DOUBLE: