Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receiver/prometheus: use actual interval startTimeMs for cumulative types #3694

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ type MetricFamily interface {
}

type metricFamily struct {
name string
mtype metricspb.MetricDescriptor_Type
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
groups map[string]*metricGroup
name string
mtype metricspb.MetricDescriptor_Type
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
groups map[string]*metricGroup
intervalStartTimeMs int64
}

func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamily {
func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamily {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand All @@ -73,15 +74,16 @@ func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) Me
}

return &metricFamily{
name: familyName,
mtype: ocaMetricType,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
groups: make(map[string]*metricGroup),
name: familyName,
mtype: ocaMetricType,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
groups: make(map[string]*metricGroup),
intervalStartTimeMs: intervalStartTimeMs,
}
}

Expand Down Expand Up @@ -164,10 +166,11 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Label
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroup{
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
}
mf.groups[groupKey] = mg
// maintaining data insertion order is helpful to generate stable/reproducible metric output
Expand Down Expand Up @@ -279,15 +282,16 @@ type dataPoint struct {
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroup struct {
family *metricFamily
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
family *metricFamily
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
intervalStartTimeMs int64
}

func (mg *metricGroup) sortPoints() {
Expand Down Expand Up @@ -388,9 +392,7 @@ func (mg *metricGroup) toDoubleValueTimeSeries(orderedLabelKeys []string) *metri
var startTs *timestamppb.Timestamp
// gauge/undefined types has no start time
if mg.family.isCumulativeType() {
// TODO(@odeke-em): use the actual interval start time as reported in
// https://github.com/open-telemetry/opentelemetry-collector/issues/3691
startTs = timestampFromMs(mg.ts)
startTs = timestampFromMs(mg.intervalStartTimeMs)
}

return &metricspb.TimeSeries{
Expand Down
8 changes: 5 additions & 3 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type metricBuilder struct {
useStartTimeMetric bool
startTimeMetricRegex *regexp.Regexp
startTime float64
intervalStartTimeMs int64
logger *zap.Logger
currentMf MetricFamily
stalenessStore *stalenessStore
Expand All @@ -65,7 +66,7 @@ type metricBuilder struct {
// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilder {
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore, intervalStartTimeMs int64) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
Expand All @@ -79,6 +80,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
intervalStartTimeMs: intervalStartTimeMs,
}
}

Expand Down Expand Up @@ -153,9 +155,9 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr
if m != nil {
b.metrics = append(b.metrics, m)
}
b.currentMf = newMetricFamily(metricName, b.mc, b.logger)
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
} else if b.currentMf == nil {
b.currentMf = newMetricFamily(metricName, b.mc, b.logger)
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
}

return b.currentMf.Add(metricName, ls, t, v)
Expand Down
12 changes: 6 additions & 6 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), startTs)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand All @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger, dummyStalenessStore())
testLogger, dummyStalenessStore(), 0)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down Expand Up @@ -1452,7 +1452,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) {
// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)

dupLabels := labels.Labels{
{Name: "__name__", Value: "test"},
Expand Down
28 changes: 14 additions & 14 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type metricGroupPdata struct {
family *metricFamilyPdata
}

func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily {
func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand All @@ -66,13 +66,14 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily {
mtype: convToPdataMetricType(metadata.Type),
groups: make(map[string]*metricGroupPdata),
metricFamily: metricFamily{
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
intervalStartTimeMs: intervalStartTimeMs,
},
}
}
Expand Down Expand Up @@ -177,9 +178,7 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p
tsNanos := pdata.Timestamp(mg.ts * 1e6)
// gauge/undefined types have no start time.
if mg.family.isCumulativeTypePdata() {
// TODO(@odeke-em): use the actual interval start time as reported in
// https://github.com/open-telemetry/opentelemetry-collector/issues/3691
startTsNanos = tsNanos
startTsNanos = pdata.Timestamp(mg.intervalStartTimeMs * 1e6)
}

point := dest.AppendEmpty()
Expand Down Expand Up @@ -213,9 +212,10 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.
mg = &metricGroupPdata{
family: mf,
metricGroup: metricGroup{
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
},
}
mf.groups[groupKey] = mg
Expand Down
Loading