From f30e2ec7d0ef73136af822c4d6e340a5e457ae88 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 16 Aug 2021 18:45:19 -0700 Subject: [PATCH] Translate time in parity with old code --- .../internal/otlp_metricfamily.go | 14 +- .../internal/otlp_transaction.go | 23 +- .../metrics_receiver_test.go | 272 +++++++++--------- 3 files changed, 172 insertions(+), 137 deletions(-) diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index 4959aba2f32..65b47442725 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -18,6 +18,7 @@ import ( "fmt" "sort" "strings" + "time" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" @@ -174,7 +175,7 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest point.SetSum(mg.sum) point.SetBucketCounts(bucketCounts) // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds. - tsNanos := pdata.Timestamp(mg.ts * 1e6) + tsNanos := timestampFromMs(mg.ts) point.SetStartTimestamp(tsNanos) point.SetTimestamp(tsNanos) populateLabelValuesPdata(orderedLabelKeys, mg.ls, point.LabelsMap()) @@ -182,6 +183,11 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest return true } +func timestampFromMs(timeAtMs int64) pdata.Timestamp { + secs, ns := timeAtMs/1e3, (timeAtMs%1e3)*1e6 + return pdata.TimestampFromTime(time.Unix(secs, ns)) +} + func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdata.SummaryDataPointSlice) bool { // expecting count to be provided, however, in the following two cases, they can be missed. // 1. data is corrupted @@ -205,7 +211,7 @@ func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdat // observations and the corresponding sum is a sum of all observed values, thus the sum and count used // at the global level of the metricspb.SummaryValue // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds. - tsNanos := pdata.Timestamp(mg.ts * 1e6) + tsNanos := timestampFromMs(mg.ts) point.SetStartTimestamp(tsNanos) point.SetTimestamp(tsNanos) point.SetSum(mg.sum) @@ -217,10 +223,10 @@ func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdat func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *pdata.NumberDataPointSlice) bool { var startTsNanos pdata.Timestamp - tsNanos := pdata.Timestamp(mg.ts * 1e6) + tsNanos := timestampFromMs(mg.ts) // gauge/undefined types have no start time. if mg.family.isCumulativeTypePdata() { - startTsNanos = pdata.Timestamp(mg.intervalStartTimeMs * 1e6) + startTsNanos = timestampFromMs(mg.intervalStartTimeMs) } point := dest.AppendEmpty() diff --git a/receiver/prometheusreceiver/internal/otlp_transaction.go b/receiver/prometheusreceiver/internal/otlp_transaction.go index 961f900fb8a..7b903896883 100644 --- a/receiver/prometheusreceiver/internal/otlp_transaction.go +++ b/receiver/prometheusreceiver/internal/otlp_transaction.go @@ -19,6 +19,7 @@ import ( "errors" "math" "sync/atomic" + "time" "go.uber.org/zap" @@ -159,6 +160,7 @@ func (t *transactionPdata) Commit() error { for _, sEntry := range staleLabels { t.metricBuilder.AddDataPoint(sEntry.labels, sEntry.seenAtMs, stalenessSpecialValue) } + t.startTimeMs = -1 ctx := t.obsrecv.StartMetricsOp(t.ctx) @@ -195,28 +197,43 @@ func (t *transactionPdata) Rollback() error { return nil } +func timestampFromFloat64(ts float64) pdata.Timestamp { + secs := int64(ts) + nanos := int64((ts - float64(secs)) * 1e9) + return pdata.TimestampFromTime(time.Unix(secs, nanos)) +} + func adjustStartTimestampPdata(startTime float64, metricsL *pdata.MetricSlice) { + startTimeTs := timestampFromFloat64(startTime) for i := 0; i < metricsL.Len(); i++ { metric := metricsL.At(i) switch metric.DataType() { - case pdata.MetricDataTypeGauge, pdata.MetricDataTypeHistogram: + case pdata.MetricDataTypeGauge: continue case pdata.MetricDataTypeSum: dataPoints := metric.Sum().DataPoints() for i := 0; i < dataPoints.Len(); i++ { dataPoint := dataPoints.At(i) - dataPoint.SetStartTimestamp(pdata.Timestamp(startTime)) + dataPoint.SetStartTimestamp(startTimeTs) } case pdata.MetricDataTypeSummary: dataPoints := metric.Summary().DataPoints() for i := 0; i < dataPoints.Len(); i++ { dataPoint := dataPoints.At(i) - dataPoint.SetStartTimestamp(pdata.Timestamp(startTime)) + dataPoint.SetStartTimestamp(startTimeTs) + } + + case pdata.MetricDataTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + for i := 0; i < dataPoints.Len(); i++ { + dataPoint := dataPoints.At(i) + dataPoint.SetStartTimestamp(startTimeTs) } default: + panic("Unknown type:: " + metric.DataType().String()) } } } diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index d91881ca47f..5e210d97757 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -1157,150 +1157,150 @@ rpc_duration_seconds_count{foo="no_quantile"} 55 ` func verifyTarget3(t *testing.T, td *testData, mds []*pdata.ResourceMetrics) { - // TODO: Translate me. - /* - verifyNumScrapeResults(t, td, mds) - m1 := mds[0] - // m1 has 3 metrics + 5 internal scraper metrics - if l := len(m1.Metrics); l != 8 { - t.Errorf("want 8, but got %v\n", l) - } + // TODO: Translate me. + /* + verifyNumScrapeResults(t, td, mds) + m1 := mds[0] + // m1 has 3 metrics + 5 internal scraper metrics + if l := len(m1.Metrics); l != 8 { + t.Errorf("want 8, but got %v\n", l) + } - ts1 := m1.Metrics[1].Timeseries[0].Points[0].Timestamp - want1 := &agentmetricspb.ExportMetricsServiceRequest{ - Node: td.node, - Resource: td.resource, - } + ts1 := m1.Metrics[1].Timeseries[0].Points[0].Timestamp + want1 := &agentmetricspb.ExportMetricsServiceRequest{ + Node: td.node, + Resource: td.resource, + } - e1 := []testExpectation{ - assertMetricPresent("go_threads", - []descriptorComparator{ - compareMetricType(metricspb.MetricDescriptor_GAUGE_DOUBLE), - }, - []seriesExpectation{ - { - points: []pointComparator{ - comparePointTimestamp(ts1), - compareDoubleVal(18), - }, + e1 := []testExpectation{ + assertMetricPresent("go_threads", + []descriptorComparator{ + compareMetricType(metricspb.MetricDescriptor_GAUGE_DOUBLE), }, - }), - assertMetricPresent("http_request_duration_seconds", - []descriptorComparator{ - compareMetricType(metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION), - }, - []seriesExpectation{ - { - series: []seriesComparator{ - compareSeriesTimestamp(ts1), - }, - points: []pointComparator{ - comparePointTimestamp(ts1), - compareHistogram(13003, 50000, []int64{10000, 1000, 1001, 1002}), + []seriesExpectation{ + { + points: []pointComparator{ + comparePointTimestamp(ts1), + compareDoubleVal(18), + }, }, + }), + assertMetricPresent("http_request_duration_seconds", + []descriptorComparator{ + compareMetricType(metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION), }, - }), - assertMetricAbsent("corrupted_hist"), - assertMetricPresent("rpc_duration_seconds", - []descriptorComparator{ - compareMetricType(metricspb.MetricDescriptor_SUMMARY), - compareMetricLabelKeys([]string{"foo"}), - }, - []seriesExpectation{ - { - series: []seriesComparator{ - compareSeriesTimestamp(ts1), - compareSeriesLabelValues([]string{"bar"}), - }, - points: []pointComparator{ - comparePointTimestamp(ts1), - compareSummary(900, 8000, map[float64]float64{1: 31, 5: 35, 50: 47, 90: 70, 99: 76}), + []seriesExpectation{ + { + series: []seriesComparator{ + compareSeriesTimestamp(ts1), + }, + points: []pointComparator{ + comparePointTimestamp(ts1), + compareHistogram(13003, 50000, []int64{10000, 1000, 1001, 1002}), + }, }, + }), + assertMetricAbsent("corrupted_hist"), + assertMetricPresent("rpc_duration_seconds", + []descriptorComparator{ + compareMetricType(metricspb.MetricDescriptor_SUMMARY), + compareMetricLabelKeys([]string{"foo"}), }, - { - series: []seriesComparator{ - compareSeriesTimestamp(ts1), - compareSeriesLabelValues([]string{"no_quantile"}), + []seriesExpectation{ + { + series: []seriesComparator{ + compareSeriesTimestamp(ts1), + compareSeriesLabelValues([]string{"bar"}), + }, + points: []pointComparator{ + comparePointTimestamp(ts1), + compareSummary(900, 8000, map[float64]float64{1: 31, 5: 35, 50: 47, 90: 70, 99: 76}), + }, }, - points: []pointComparator{ - comparePointTimestamp(ts1), - compareSummary(50, 100, map[float64]float64{}), + { + series: []seriesComparator{ + compareSeriesTimestamp(ts1), + compareSeriesLabelValues([]string{"no_quantile"}), + }, + points: []pointComparator{ + comparePointTimestamp(ts1), + compareSummary(50, 100, map[float64]float64{}), + }, }, - }, - }), - } + }), + } - doCompare("scrape1", t, want1, m1, e1) + doCompare("scrape1", t, want1, m1, e1) - // verify the 2nd metricData - m2 := mds[1] - ts2 := m2.Metrics[0].Timeseries[0].Points[0].Timestamp + // verify the 2nd metricData + m2 := mds[1] + ts2 := m2.Metrics[0].Timeseries[0].Points[0].Timestamp - want2 := &agentmetricspb.ExportMetricsServiceRequest{ - Node: td.node, - Resource: td.resource, - } + want2 := &agentmetricspb.ExportMetricsServiceRequest{ + Node: td.node, + Resource: td.resource, + } - e2 := []testExpectation{ - assertMetricPresent("go_threads", - []descriptorComparator{ - compareMetricType(metricspb.MetricDescriptor_GAUGE_DOUBLE), - }, - []seriesExpectation{ - { - points: []pointComparator{ - comparePointTimestamp(ts2), - compareDoubleVal(16), - }, + e2 := []testExpectation{ + assertMetricPresent("go_threads", + []descriptorComparator{ + compareMetricType(metricspb.MetricDescriptor_GAUGE_DOUBLE), }, - }), - assertMetricPresent("http_request_duration_seconds", - []descriptorComparator{ - compareMetricType(metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION), - }, - []seriesExpectation{ - { - series: []seriesComparator{ - compareSeriesTimestamp(ts1), - }, - points: []pointComparator{ - comparePointTimestamp(ts2), - compareHistogram(14003, 50100, []int64{11000, 1000, 1001, 1002}), + []seriesExpectation{ + { + points: []pointComparator{ + comparePointTimestamp(ts2), + compareDoubleVal(16), + }, }, + }), + assertMetricPresent("http_request_duration_seconds", + []descriptorComparator{ + compareMetricType(metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION), }, - }), - assertMetricAbsent("corrupted_hist"), - assertMetricPresent("rpc_duration_seconds", - []descriptorComparator{ - compareMetricType(metricspb.MetricDescriptor_SUMMARY), - compareMetricLabelKeys([]string{"foo"}), - }, - []seriesExpectation{ - { - series: []seriesComparator{ - compareSeriesTimestamp(ts1), - compareSeriesLabelValues([]string{"bar"}), - }, - points: []pointComparator{ - comparePointTimestamp(ts2), - compareSummary(950, 8100, map[float64]float64{1: 32, 5: 35, 50: 47, 90: 70, 99: 77}), + []seriesExpectation{ + { + series: []seriesComparator{ + compareSeriesTimestamp(ts1), + }, + points: []pointComparator{ + comparePointTimestamp(ts2), + compareHistogram(14003, 50100, []int64{11000, 1000, 1001, 1002}), + }, }, + }), + assertMetricAbsent("corrupted_hist"), + assertMetricPresent("rpc_duration_seconds", + []descriptorComparator{ + compareMetricType(metricspb.MetricDescriptor_SUMMARY), + compareMetricLabelKeys([]string{"foo"}), }, - { - series: []seriesComparator{ - compareSeriesTimestamp(ts1), - compareSeriesLabelValues([]string{"no_quantile"}), + []seriesExpectation{ + { + series: []seriesComparator{ + compareSeriesTimestamp(ts1), + compareSeriesLabelValues([]string{"bar"}), + }, + points: []pointComparator{ + comparePointTimestamp(ts2), + compareSummary(950, 8100, map[float64]float64{1: 32, 5: 35, 50: 47, 90: 70, 99: 77}), + }, }, - points: []pointComparator{ - comparePointTimestamp(ts2), - compareSummary(55, 101, map[float64]float64{}), + { + series: []seriesComparator{ + compareSeriesTimestamp(ts1), + compareSeriesLabelValues([]string{"no_quantile"}), + }, + points: []pointComparator{ + comparePointTimestamp(ts2), + compareSummary(55, 101, map[float64]float64{}), + }, }, - }, - }), - } + }), + } - doCompare("scrape2", t, want2, m2, e2) - */ + doCompare("scrape2", t, want2, m2, e2) + */ } // TestEndToEnd end to end test executor @@ -1372,8 +1372,21 @@ process_start_time_seconds 400.8 var startTimeMetricPageStartTimestamp = ×tamppb.Timestamp{Seconds: 971, Nanos: 800000000} -// 6 metrics + 5 internal metrics -const numStartTimeMetricPageTimeseries = 11 +// 5 scraped metrics + 5 internal metrics where: +// 5 scraped metrics: +// * go_threads +// * http_requests_total +// * http_request_duration_seconds +// * rpc_duration_seconds +// * process_start_time +// +// 5 internal metrics: +// * scrape_duration_seconds +// * scrape_samples_post_metric_relabeling +// * scrape_samples_scraped +// * scrape_series_added +// * up +const numStartTimeMetricPageTimeseries = 10 // TestStartTimeMetric validates that timeseries have start time set to 'process_start_time_seconds' func TestStartTimeMetric(t *testing.T) { @@ -1522,7 +1535,6 @@ func verifyStartTimeMetricPage(t *testing.T, _ *testData, rmL []*pdata.ResourceM metrics := ilm.Metrics() for j := 0; j < metrics.Len(); j++ { metric := metrics.At(j) - t.Logf("METRIC: %q :: %q", metric.Name(), metric.DataType()) timestamp := startTimeMetricPageStartTimestamp numMetrics++ switch dataType := metric.DataType(); dataType { @@ -1531,7 +1543,7 @@ func verifyStartTimeMetricPage(t *testing.T, _ *testData, rmL []*pdata.ResourceM dataPoints := metric.Gauge().DataPoints() for k := 0; k < dataPoints.Len(); k++ { point := dataPoints.At(k) - assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()) + assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()+" :: "+metric.DataType().String()) } case pdata.MetricDataTypeHistogram: @@ -1539,14 +1551,14 @@ func verifyStartTimeMetricPage(t *testing.T, _ *testData, rmL []*pdata.ResourceM dataPoints := metric.Histogram().DataPoints() for k := 0; k < dataPoints.Len(); k++ { point := dataPoints.At(k) - assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()) + assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()+" :: "+metric.DataType().String()) } case pdata.MetricDataTypeSummary: dataPoints := metric.Summary().DataPoints() for k := 0; k < dataPoints.Len(); k++ { point := dataPoints.At(k) - assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()) + assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()+" :: "+metric.DataType().String()) } case pdata.MetricDataTypeSum: @@ -1554,7 +1566,7 @@ func verifyStartTimeMetricPage(t *testing.T, _ *testData, rmL []*pdata.ResourceM dataPoints := metric.Sum().DataPoints() for k := 0; k < dataPoints.Len(); k++ { point := dataPoints.At(k) - assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()) + assert.Equal(t, timestamp.AsTime(), point.StartTimestamp().AsTime(), metric.Name()+" :: "+metric.DataType().String()) } default: