Skip to content

Commit

Permalink
receiver/prometheus: use actual interval startTimeMs for cumulative t…
Browse files Browse the repository at this point in the history
…ypes

With this change, we now infer the actual interval startTime
for cumulative types from the original starttime interval reinforcing
what the OpenTelemetry Proto recommendations say in
https://github.com/open-telemetry/opentelemetry-proto/blob/bc8ee79d8e01faf3310af2987268e94285f354da/opentelemetry/proto/metrics/v1/metrics.proto#L132-L140

Fixes open-telemetry#3691
  • Loading branch information
odeke-em committed Jul 29, 2021
1 parent f804c9c commit f7cee29
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 93 deletions.
72 changes: 37 additions & 35 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ type MetricFamily interface {
}

type metricFamily struct {
name string
mtype metricspb.MetricDescriptor_Type
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
groups map[string]*metricGroup
name string
mtype metricspb.MetricDescriptor_Type
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
groups map[string]*metricGroup
intervalStartTimeMs int64
}

func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamily {
func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamily {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand All @@ -73,15 +74,16 @@ func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) Me
}

return &metricFamily{
name: familyName,
mtype: ocaMetricType,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
groups: make(map[string]*metricGroup),
name: familyName,
mtype: ocaMetricType,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
groups: make(map[string]*metricGroup),
intervalStartTimeMs: intervalStartTimeMs,
}
}

Expand Down Expand Up @@ -164,10 +166,11 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Label
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroup{
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
}
mf.groups[groupKey] = mg
// maintaining data insertion order is helpful to generate stable/reproducible metric output
Expand Down Expand Up @@ -279,15 +282,16 @@ type dataPoint struct {
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroup struct {
family *metricFamily
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
family *metricFamily
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
intervalStartTimeMs int64
}

func (mg *metricGroup) sortPoints() {
Expand Down Expand Up @@ -388,9 +392,7 @@ func (mg *metricGroup) toDoubleValueTimeSeries(orderedLabelKeys []string) *metri
var startTs *timestamppb.Timestamp
// gauge/undefined types has no start time
if mg.family.isCumulativeType() {
// TODO(@odeke-em): use the actual interval start time as reported in
// https://github.com/open-telemetry/opentelemetry-collector/issues/3691
startTs = timestampFromMs(mg.ts)
startTs = timestampFromMs(mg.intervalStartTimeMs)
}

return &metricspb.TimeSeries{
Expand Down
8 changes: 5 additions & 3 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type metricBuilder struct {
useStartTimeMetric bool
startTimeMetricRegex *regexp.Regexp
startTime float64
intervalStartTimeMs int64
logger *zap.Logger
currentMf MetricFamily
stalenessStore *stalenessStore
Expand All @@ -65,7 +66,7 @@ type metricBuilder struct {
// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilder {
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore, intervalStartTimeMs int64) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
Expand All @@ -79,6 +80,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
intervalStartTimeMs: intervalStartTimeMs,
}
}

Expand Down Expand Up @@ -153,9 +155,9 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr
if m != nil {
b.metrics = append(b.metrics, m)
}
b.currentMf = newMetricFamily(metricName, b.mc, b.logger)
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
} else if b.currentMf == nil {
b.currentMf = newMetricFamily(metricName, b.mc, b.logger)
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
}

return b.currentMf.Add(metricName, ls, t, v)
Expand Down
12 changes: 6 additions & 6 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), startTs)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand All @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger, dummyStalenessStore())
testLogger, dummyStalenessStore(), 0)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down Expand Up @@ -1452,7 +1452,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) {
// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)

dupLabels := labels.Labels{
{Name: "__name__", Value: "test"},
Expand Down
28 changes: 14 additions & 14 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type metricGroupPdata struct {
family *metricFamilyPdata
}

func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily {
func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand All @@ -66,13 +66,14 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily {
mtype: convToPdataMetricType(metadata.Type),
groups: make(map[string]*metricGroupPdata),
metricFamily: metricFamily{
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
intervalStartTimeMs: intervalStartTimeMs,
},
}
}
Expand Down Expand Up @@ -177,9 +178,7 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p
tsNanos := pdata.Timestamp(mg.ts * 1e6)
// gauge/undefined types have no start time.
if mg.family.isCumulativeTypePdata() {
// TODO(@odeke-em): use the actual interval start time as reported in
// https://github.com/open-telemetry/opentelemetry-collector/issues/3691
startTsNanos = tsNanos
startTsNanos = pdata.Timestamp(mg.intervalStartTimeMs * 1e6)
}

point := dest.AppendEmpty()
Expand Down Expand Up @@ -213,9 +212,10 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.
mg = &metricGroupPdata{
family: mf,
metricGroup: metricGroup{
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
},
}
mf.groups[groupKey] = mg
Expand Down
Loading

0 comments on commit f7cee29

Please sign in to comment.