Skip to content

Commit

Permalink
receiver/prometheus: use actual interval startTimeMs for cumulative t…
Browse files Browse the repository at this point in the history
…ypes (#3694)

With this change, we now infer the actual interval startTime
for cumulative types from the original starttime interval reinforcing
what the OpenTelemetry Proto recommendations say in
https://github.com/open-telemetry/opentelemetry-proto/blob/bc8ee79d8e01faf3310af2987268e94285f354da/opentelemetry/proto/metrics/v1/metrics.proto#L132-L140

Fixes #3691
  • Loading branch information
odeke-em authored Aug 2, 2021
1 parent 6b4a70b commit b328e13
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 93 deletions.
72 changes: 37 additions & 35 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ type MetricFamily interface {
}

type metricFamily struct {
name string
mtype metricspb.MetricDescriptor_Type
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
groups map[string]*metricGroup
name string
mtype metricspb.MetricDescriptor_Type
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
groups map[string]*metricGroup
intervalStartTimeMs int64
}

func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamily {
func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamily {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand All @@ -73,15 +74,16 @@ func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) Me
}

return &metricFamily{
name: familyName,
mtype: ocaMetricType,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
groups: make(map[string]*metricGroup),
name: familyName,
mtype: ocaMetricType,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
groups: make(map[string]*metricGroup),
intervalStartTimeMs: intervalStartTimeMs,
}
}

Expand Down Expand Up @@ -164,10 +166,11 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Label
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroup{
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
}
mf.groups[groupKey] = mg
// maintaining data insertion order is helpful to generate stable/reproducible metric output
Expand Down Expand Up @@ -279,15 +282,16 @@ type dataPoint struct {
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroup struct {
family *metricFamily
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
family *metricFamily
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
intervalStartTimeMs int64
}

func (mg *metricGroup) sortPoints() {
Expand Down Expand Up @@ -388,9 +392,7 @@ func (mg *metricGroup) toDoubleValueTimeSeries(orderedLabelKeys []string) *metri
var startTs *timestamppb.Timestamp
// gauge/undefined types has no start time
if mg.family.isCumulativeType() {
// TODO(@odeke-em): use the actual interval start time as reported in
// https://github.com/open-telemetry/opentelemetry-collector/issues/3691
startTs = timestampFromMs(mg.ts)
startTs = timestampFromMs(mg.intervalStartTimeMs)
}

return &metricspb.TimeSeries{
Expand Down
8 changes: 5 additions & 3 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type metricBuilder struct {
useStartTimeMetric bool
startTimeMetricRegex *regexp.Regexp
startTime float64
intervalStartTimeMs int64
logger *zap.Logger
currentMf MetricFamily
stalenessStore *stalenessStore
Expand All @@ -65,7 +66,7 @@ type metricBuilder struct {
// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilder {
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore, intervalStartTimeMs int64) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
Expand All @@ -79,6 +80,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
intervalStartTimeMs: intervalStartTimeMs,
}
}

Expand Down Expand Up @@ -153,9 +155,9 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr
if m != nil {
b.metrics = append(b.metrics, m)
}
b.currentMf = newMetricFamily(metricName, b.mc, b.logger)
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
} else if b.currentMf == nil {
b.currentMf = newMetricFamily(metricName, b.mc, b.logger)
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
}

return b.currentMf.Add(metricName, ls, t, v)
Expand Down
12 changes: 6 additions & 6 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), startTs)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand All @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger, dummyStalenessStore())
testLogger, dummyStalenessStore(), 0)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down Expand Up @@ -1452,7 +1452,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) {
// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)

dupLabels := labels.Labels{
{Name: "__name__", Value: "test"},
Expand Down
28 changes: 14 additions & 14 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type metricGroupPdata struct {
family *metricFamilyPdata
}

func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily {
func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand All @@ -66,13 +66,14 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily {
mtype: convToPdataMetricType(metadata.Type),
groups: make(map[string]*metricGroupPdata),
metricFamily: metricFamily{
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
intervalStartTimeMs: intervalStartTimeMs,
},
}
}
Expand Down Expand Up @@ -177,9 +178,7 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p
tsNanos := pdata.Timestamp(mg.ts * 1e6)
// gauge/undefined types have no start time.
if mg.family.isCumulativeTypePdata() {
// TODO(@odeke-em): use the actual interval start time as reported in
// https://github.com/open-telemetry/opentelemetry-collector/issues/3691
startTsNanos = tsNanos
startTsNanos = pdata.Timestamp(mg.intervalStartTimeMs * 1e6)
}

point := dest.AppendEmpty()
Expand Down Expand Up @@ -213,9 +212,10 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.
mg = &metricGroupPdata{
family: mf,
metricGroup: metricGroup{
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
},
}
mf.groups[groupKey] = mg
Expand Down
Loading

0 comments on commit b328e13

Please sign in to comment.