From f7cee29b34c8d7f2e89850dbbaee2b342cdbc4c4 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 21 Jul 2021 19:05:29 -0700 Subject: [PATCH] receiver/prometheus: use actual interval startTimeMs for cumulative types With this change, we now infer the actual interval startTime for cumulative types from the original starttime interval reinforcing what the OpenTelemetry Proto recommendations say in https://github.com/open-telemetry/opentelemetry-proto/blob/bc8ee79d8e01faf3310af2987268e94285f354da/opentelemetry/proto/metrics/v1/metrics.proto#L132-L140 Fixes #3691 --- .../internal/metricfamily.go | 72 +++++++------ .../internal/metricsbuilder.go | 8 +- .../internal/metricsbuilder_test.go | 12 +-- .../internal/otlp_metricfamily.go | 28 ++--- .../internal/otlp_metricfamily_test.go | 101 ++++++++++++------ .../internal/transaction.go | 2 +- 6 files changed, 130 insertions(+), 93 deletions(-) diff --git a/receiver/prometheusreceiver/internal/metricfamily.go b/receiver/prometheusreceiver/internal/metricfamily.go index b735bb0fcfb..9898a0b9f49 100644 --- a/receiver/prometheusreceiver/internal/metricfamily.go +++ b/receiver/prometheusreceiver/internal/metricfamily.go @@ -37,18 +37,19 @@ type MetricFamily interface { } type metricFamily struct { - name string - mtype metricspb.MetricDescriptor_Type - mc MetadataCache - droppedTimeseries int - labelKeys map[string]bool - labelKeysOrdered []string - metadata *scrape.MetricMetadata - groupOrders map[string]int - groups map[string]*metricGroup + name string + mtype metricspb.MetricDescriptor_Type + mc MetadataCache + droppedTimeseries int + labelKeys map[string]bool + labelKeysOrdered []string + metadata *scrape.MetricMetadata + groupOrders map[string]int + groups map[string]*metricGroup + intervalStartTimeMs int64 } -func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamily { +func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamily { familyName := normalizeMetricName(metricName) // lookup metadata based on familyName @@ -73,15 +74,16 @@ func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) Me } return &metricFamily{ - name: familyName, - mtype: ocaMetricType, - mc: mc, - droppedTimeseries: 0, - labelKeys: make(map[string]bool), - labelKeysOrdered: make([]string, 0), - metadata: &metadata, - groupOrders: make(map[string]int), - groups: make(map[string]*metricGroup), + name: familyName, + mtype: ocaMetricType, + mc: mc, + droppedTimeseries: 0, + labelKeys: make(map[string]bool), + labelKeysOrdered: make([]string, 0), + metadata: &metadata, + groupOrders: make(map[string]int), + groups: make(map[string]*metricGroup), + intervalStartTimeMs: intervalStartTimeMs, } } @@ -164,10 +166,11 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Label mg, ok := mf.groups[groupKey] if !ok { mg = &metricGroup{ - family: mf, - ts: ts, - ls: ls, - complexValue: make([]*dataPoint, 0), + family: mf, + ts: ts, + ls: ls, + complexValue: make([]*dataPoint, 0), + intervalStartTimeMs: mf.intervalStartTimeMs, } mf.groups[groupKey] = mg // maintaining data insertion order is helpful to generate stable/reproducible metric output @@ -279,15 +282,16 @@ type dataPoint struct { // a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for // simple types like counter and gauge, each data point is a group of itself type metricGroup struct { - family *metricFamily - ts int64 - ls labels.Labels - count float64 - hasCount bool - sum float64 - hasSum bool - value float64 - complexValue []*dataPoint + family *metricFamily + ts int64 + ls labels.Labels + count float64 + hasCount bool + sum float64 + hasSum bool + value float64 + complexValue []*dataPoint + intervalStartTimeMs int64 } func (mg *metricGroup) sortPoints() { @@ -388,9 +392,7 @@ func (mg *metricGroup) toDoubleValueTimeSeries(orderedLabelKeys []string) *metri var startTs *timestamppb.Timestamp // gauge/undefined types has no start time if mg.family.isCumulativeType() { - // TODO(@odeke-em): use the actual interval start time as reported in - // https://github.com/open-telemetry/opentelemetry-collector/issues/3691 - startTs = timestampFromMs(mg.ts) + startTs = timestampFromMs(mg.intervalStartTimeMs) } return &metricspb.TimeSeries{ diff --git a/receiver/prometheusreceiver/internal/metricsbuilder.go b/receiver/prometheusreceiver/internal/metricsbuilder.go index 7210db9b41d..600be819dff 100644 --- a/receiver/prometheusreceiver/internal/metricsbuilder.go +++ b/receiver/prometheusreceiver/internal/metricsbuilder.go @@ -57,6 +57,7 @@ type metricBuilder struct { useStartTimeMetric bool startTimeMetricRegex *regexp.Regexp startTime float64 + intervalStartTimeMs int64 logger *zap.Logger currentMf MetricFamily stalenessStore *stalenessStore @@ -65,7 +66,7 @@ type metricBuilder struct { // newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus // scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object // by calling its Build function -func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilder { +func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore, intervalStartTimeMs int64) *metricBuilder { var regex *regexp.Regexp if startTimeMetricRegex != "" { regex, _ = regexp.Compile(startTimeMetricRegex) @@ -79,6 +80,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric useStartTimeMetric: useStartTimeMetric, startTimeMetricRegex: regex, stalenessStore: stalenessStore, + intervalStartTimeMs: intervalStartTimeMs, } } @@ -153,9 +155,9 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr if m != nil { b.metrics = append(b.metrics, m) } - b.currentMf = newMetricFamily(metricName, b.mc, b.logger) + b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs) } else if b.currentMf == nil { - b.currentMf = newMetricFamily(metricName, b.mc, b.logger) + b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs) } return b.currentMf.Add(metricName, ls, t, v) diff --git a/receiver/prometheusreceiver/internal/metricsbuilder_test.go b/receiver/prometheusreceiver/internal/metricsbuilder_test.go index 68bb9b70d39..d398ace689a 100644 --- a/receiver/prometheusreceiver/internal/metricsbuilder_test.go +++ b/receiver/prometheusreceiver/internal/metricsbuilder_test.go @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) { mc := newMockMetadataCache(testMetadata) st := startTs for i, page := range tt.inputs { - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), startTs) b.startTime = defaultBuilderStartTime // set to a non-zero value for _, pt := range page.pts { // set ts for testing @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData, st := startTs for _, page := range tt.inputs { b := newMetricBuilder(mc, true, startTimeMetricRegex, - testLogger, dummyStalenessStore()) + testLogger, dummyStalenessStore(), 0) b.startTime = defaultBuilderStartTime // set to a non-zero value for _, pt := range page.pts { // set ts for testing @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) { func Test_metricBuilder_baddata(t *testing.T) { t.Run("empty-metric-name", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound { t.Error("expecting errMetricNameNotFound error, but get nil") @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) { t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) { t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") @@ -1452,7 +1452,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) { // Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44. func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) { mc := newMockMetadataCache(testMetadata) - mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) dupLabels := labels.Labels{ {Name: "__name__", Value: "test"}, diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index f1ad9fcbb63..dcff1e9ff90 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -44,7 +44,7 @@ type metricGroupPdata struct { family *metricFamilyPdata } -func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily { +func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily { familyName := normalizeMetricName(metricName) // lookup metadata based on familyName @@ -66,13 +66,14 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily { mtype: convToPdataMetricType(metadata.Type), groups: make(map[string]*metricGroupPdata), metricFamily: metricFamily{ - name: familyName, - mc: mc, - droppedTimeseries: 0, - labelKeys: make(map[string]bool), - labelKeysOrdered: make([]string, 0), - metadata: &metadata, - groupOrders: make(map[string]int), + name: familyName, + mc: mc, + droppedTimeseries: 0, + labelKeys: make(map[string]bool), + labelKeysOrdered: make([]string, 0), + metadata: &metadata, + groupOrders: make(map[string]int), + intervalStartTimeMs: intervalStartTimeMs, }, } } @@ -177,9 +178,7 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p tsNanos := pdata.Timestamp(mg.ts * 1e6) // gauge/undefined types have no start time. if mg.family.isCumulativeTypePdata() { - // TODO(@odeke-em): use the actual interval start time as reported in - // https://github.com/open-telemetry/opentelemetry-collector/issues/3691 - startTsNanos = tsNanos + startTsNanos = pdata.Timestamp(mg.intervalStartTimeMs * 1e6) } point := dest.AppendEmpty() @@ -213,9 +212,10 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels. mg = &metricGroupPdata{ family: mf, metricGroup: metricGroup{ - ts: ts, - ls: ls, - complexValue: make([]*dataPoint, 0), + ts: ts, + ls: ls, + complexValue: make([]*dataPoint, 0), + intervalStartTimeMs: mf.intervalStartTimeMs, }, } mf.groups[groupKey] = mg diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go index f789918c91b..b5c0d2c0c42 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go @@ -94,8 +94,8 @@ func TestIsCumulativeEquivalence(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mfp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily) + mfp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative") assert.Equal(t, mf.isCumulativeType(), tt.want, "isCumulative does not match for regular metricFamily") assert.Equal(t, mfp.isCumulativeTypePdata(), tt.want, "isCumulative does not match for pdata metricFamily") @@ -110,14 +110,18 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { metric string } tests := []struct { - name string - labels labels.Labels - scrapes []*scrape - want func() pdata.HistogramDataPoint + name string + metricName string + labels labels.Labels + scrapes []*scrape + want func() pdata.HistogramDataPoint + intervalStartTimeMs int64 }{ { - name: "histogram", - labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}}, + name: "histogram with startTimestamp of 11", + metricName: "histogram", + intervalStartTimeMs: 1717, + labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}}, scrapes: []*scrape{ {at: 11, value: 10, metric: "histogram_count"}, {at: 11, value: 1004.78, metric: "histogram_sum"}, @@ -142,7 +146,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.metricName, mc, tt.intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) } @@ -183,11 +187,12 @@ func TestMetricGroupData_toDistributionPointEquivalence(t *testing.T) { }, } - for _, tt := range tests { + for i, tt := range tests { tt := tt + intervalStartTimeMs := int64(i + 1) t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily) + mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -350,7 +355,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) for _, lbs := range tt.labelsScrapes { for _, scrape := range lbs.scrapes { require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value)) @@ -400,8 +405,8 @@ func TestMetricGroupData_toSummaryPointEquivalence(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily) + mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -453,22 +458,45 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { metric string } tests := []struct { - name string - labels labels.Labels - scrapes []*scrape - want func() pdata.NumberDataPoint + name string + metricKind string + labels labels.Labels + scrapes []*scrape + intervalStartTimestampMs int64 + want func() pdata.NumberDataPoint }{ { - name: "counter", - labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "b", Value: "B"}}, + metricKind: "counter", + name: "counter:: startTimestampMs of 11", + intervalStartTimestampMs: 11, + labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "b", Value: "B"}}, scrapes: []*scrape{ - {at: 38, value: 39.9, metric: "value"}, + {at: 13, value: 33.7, metric: "value"}, }, want: func() pdata.NumberDataPoint { point := pdata.NewNumberDataPoint() - point.SetDoubleVal(39.9) - point.SetTimestamp(38 * 1e6) // the time in milliseconds -> nanoseconds. - point.SetStartTimestamp(38 * 1e6) + point.SetDoubleVal(33.7) + point.SetTimestamp(13 * 1e6) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(11 * 1e6) + labelsMap := point.LabelsMap() + labelsMap.Insert("a", "A") + labelsMap.Insert("b", "B") + return point + }, + }, + { + name: "counter:: startTimestampMs of 0", + metricKind: "counter", + intervalStartTimestampMs: 0, + labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "b", Value: "B"}}, + scrapes: []*scrape{ + {at: 28, value: 99.9, metric: "value"}, + }, + want: func() pdata.NumberDataPoint { + point := pdata.NewNumberDataPoint() + point.SetDoubleVal(99.9) + point.SetTimestamp(28 * 1e6) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(0) labelsMap := point.LabelsMap() labelsMap.Insert("a", "A") labelsMap.Insert("b", "B") @@ -480,7 +508,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.metricKind, mc, tt.intervalStartTimestampMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) } @@ -506,9 +534,10 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) { metric string } tests := []struct { - name string - labels labels.Labels - scrapes []*scrape + name string + labels labels.Labels + scrapes []*scrape + wantValue float64 }{ { name: "counter", @@ -516,14 +545,16 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) { scrapes: []*scrape{ {at: 13, value: 33.7, metric: "value"}, }, + wantValue: 33.7, }, } - for _, tt := range tests { + for i, tt := range tests { tt := tt + intervalStartTimeMs := int64(11 + i) t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily) + mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -537,9 +568,11 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) { ocPoint := ocTimeseries.Points[0] pdataPoint := ddpL.At(0) // 1. Ensure that the startTimestamps are equal. - require.Equal(t, ocTimeseries.GetStartTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "The timestamp must be equal") + require.Equal(t, ocTimeseries.GetStartTimestamp().AsTime(), pdataPoint.StartTimestamp().AsTime(), "The timestamp must be equal") + require.Equal(t, intervalStartTimeMs*1e6, pdataPoint.StartTimestamp().AsTime().UnixNano(), "intervalStartTimeMs must be the same") // 2. Ensure that the value is equal. - require.Equal(t, ocPoint.GetDoubleValue(), pdataPoint.DoubleVal(), "Count must be equal") + require.Equal(t, ocPoint.GetDoubleValue(), pdataPoint.DoubleVal(), "Values must be equal") + require.Equal(t, tt.wantValue, pdataPoint.DoubleVal(), "Values must be equal") // 4. Ensure that the point's timestamp is equal to that from the OpenCensusProto data point. require.Equal(t, ocPoint.GetTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "Point timestamps must be equal") // 5. Ensure that the labels all match up. diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index 48cc5479627..90449c0e9cf 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -166,7 +166,7 @@ func (tr *transaction) initTransaction(ls labels.Labels) error { tr.instance = instance } tr.node, tr.resource = createNodeAndResource(job, instance, mc.SharedLabels().Get(model.SchemeLabel)) - tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger, tr.stalenessStore) + tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger, tr.stalenessStore, tr.startTimeMs) tr.isNew = false return nil }