Skip to content

Commit

Permalink
receiver/prometheus: add ToMetricPdata method
Browse files Browse the repository at this point in the history
This change wires together the logic to convert
the pdata converters into a method ToMetricPdata
that appends converted metrics. The next change or two,
will then hook everything up directly and will allow us
to delete all the prior code!!

Updates #3137
Updates PR open-telemetry#3674
Requires PR open-telemetry#3694
Updates open-telemetry#3691
  • Loading branch information
odeke-em committed Aug 2, 2021
1 parent 6c72394 commit 8e005a1
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 1 deletion.
75 changes: 74 additions & 1 deletion receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// MetricFamilyPdata is unit which is corresponding to the metrics items which shared the same TYPE/UNIT/... metadata from
// a single scrape.
type MetricFamilyPdata interface {
Add(metricName string, ls labels.Labels, t int64, v float64) error
IsSameFamily(metricName string) bool
ToMetricPdata(metrics *pdata.MetricSlice) (int, int)
}

type metricFamilyPdata struct {
// We are composing the already present metricFamily to
// make for a scalable migration, so that we only edit target
Expand All @@ -44,7 +52,7 @@ type metricGroupPdata struct {
family *metricFamilyPdata
}

func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily {
func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamilyPdata {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand Down Expand Up @@ -254,3 +262,68 @@ func (mf *metricFamilyPdata) Add(metricName string, ls labels.Labels, t int64, v

return nil
}

// getGroups to return groups in insertion order
func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata {
groups := make([]*metricGroupPdata, len(mf.groupOrders))
for k, v := range mf.groupOrders {
groups[v] = mf.groups[k]
}
return groups
}

func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int) {
metric := pdata.NewMetric()
pointCount := 0

switch mf.mtype {
case pdata.MetricDataTypeHistogram:
histogram := metric.Histogram()
hdpL := histogram.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toDistributionPoint(mf.labelKeysOrdered, &hdpL) {
mf.droppedTimeseries++
}
}
pointCount = hdpL.Len()

case pdata.MetricDataTypeSummary:
summary := metric.Summary()
sdpL := summary.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toSummaryPoint(mf.labelKeysOrdered, &sdpL) {
mf.droppedTimeseries++
}
}
pointCount = sdpL.Len()

case pdata.MetricDataTypeSum:
sum := metric.Sum()
sdpL := sum.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toNumberDataPoint(mf.labelKeysOrdered, &sdpL) {
mf.droppedTimeseries++
}
}
pointCount = sdpL.Len()

default:
gauge := metric.Gauge()
gdpL := gauge.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toNumberDataPoint(mf.labelKeysOrdered, &gdpL) {
mf.droppedTimeseries++
}
}
pointCount = gdpL.Len()
}

if pointCount == 0 {
return mf.droppedTimeseries, mf.droppedTimeseries
}

metric.CopyTo(metrics.AppendEmpty())

// note: the total number of points is the number of points+droppedTimeseries.
return pointCount + mf.droppedTimeseries, mf.droppedTimeseries
}
105 changes: 105 additions & 0 deletions receiver/prometheusreceiver/internal/otlp_metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package internal

import (
"fmt"
"regexp"
"sort"
"strconv"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"go.uber.org/zap"

"go.opentelemetry.io/collector/model/pdata"
)
Expand Down Expand Up @@ -76,3 +81,103 @@ func convToPdataMetricType(metricType textparse.MetricType) pdata.MetricDataType
return pdata.MetricDataTypeNone
}
}

type metricBuilderPdata struct {
*metricBuilder
metrics pdata.MetricSlice
currentMf MetricFamilyPdata
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilderPdata {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
}
return &metricBuilderPdata{
metrics: pdata.NewMetricSlice(),
metricBuilder: &metricBuilder{
mc: mc,
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
},
}
}

// This code is used in follow-up changes but golangci-lint is so pedantic.
var _ = newMetricBuilderPdata
var _ = (*metricBuilderPdata)(nil).AddDataPoint

// AddDataPoint is for feeding prometheus data complexValue in its processing order
func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr error) {
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
seen := make(map[string]bool)
dupLabels := make([]string, 0, len(ls))
for _, label := range ls {
if _, ok := seen[label.Name]; ok {
dupLabels = append(dupLabels, label.Name)
}
seen[label.Name] = true
}
if len(dupLabels) != 0 {
sort.Strings(dupLabels)
return fmt.Errorf("invalid sample: non-unique label names: %q", dupLabels)
}

defer func() {
// Only mark this data point as in the current scrape
// iff it isn't a stale metric.
if rerr == nil && !value.IsStaleNaN(v) {
b.stalenessStore.markAsCurrentlySeen(ls, t)
}
}()

metricName := ls.Get(model.MetricNameLabel)
switch {
case metricName == "":
b.numTimeseries++
b.droppedTimeseries++
return errMetricNameNotFound
case isInternalMetric(metricName):
b.hasInternalMetric = true
lm := ls.Map()
// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
if metricName == scrapeUpMetricName && v != 1.0 {
if v == 0.0 {
b.logger.Warn("Failed to scrape Prometheus endpoint",
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))
} else {
b.logger.Warn("The 'up' metric contains invalid value",
zap.Float64("value", v),
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))
}
}
case b.useStartTimeMetric && b.matchStartTimeMetric(metricName):
b.startTime = v
}

b.hasData = true

if b.currentMf != nil && !b.currentMf.IsSameFamily(metricName) {
ts, dts := b.currentMf.ToMetricPdata(&b.metrics)
b.numTimeseries += ts
b.droppedTimeseries += dts
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs)
} else if b.currentMf == nil {
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs)
}

return b.currentMf.Add(metricName, ls, t, v)
}

0 comments on commit 8e005a1

Please sign in to comment.