From 6addfa5f20097541c20e06789e95395efe103be6 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 22 Jul 2021 01:06:44 -0700 Subject: [PATCH] receiver/prometheus: add ToMetricPdata method This change wires together the logic to convert the pdata converters into a method ToMetricPdata that appends converted metrics. The next change or two, will then hook everything up directly and will allow us to delete all the prior code!! Updates #3137 Updates PR #3674 Requires PR #3694 Updates #3691 --- .../internal/otlp_metricfamily.go | 65 ++++++++++- .../internal/otlp_metricsbuilder.go | 102 +++++++++++++++++- 2 files changed, 165 insertions(+), 2 deletions(-) diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index 8eca8a395140..7168c7ee424f 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -24,6 +24,14 @@ import ( "go.opentelemetry.io/collector/model/pdata" ) +// MetricFamilyPdata is unit which is corresponding to the metrics items which shared the same TYPE/UNIT/... metadata from +// a single scrape. +type MetricFamilyPdata interface { + Add(metricName string, ls labels.Labels, t int64, v float64) error + IsSameFamily(metricName string) bool + ToMetricPdata(metrics *pdata.MetricSlice) (int, int) +} + type metricFamilyPdata struct { // We are composing the already present metricFamily to // make for a scalable migration, so that we only edit target @@ -44,7 +52,7 @@ type metricGroupPdata struct { family *metricFamilyPdata } -func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily { +func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamilyPdata { familyName := normalizeMetricName(metricName) // lookup metadata based on familyName @@ -255,3 +263,58 @@ func (mf *metricFamilyPdata) Add(metricName string, ls labels.Labels, t int64, v return nil } + +// getGroups to return groups in insertion order +func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata { + groups := make([]*metricGroupPdata, len(mf.groupOrders)) + for k, v := range mf.groupOrders { + groups[v] = mf.groups[k] + } + return groups +} + +func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int) { + metric := pdata.NewMetric() + pointCount := 0 + + switch mf.mtype { + case pdata.MetricDataTypeHistogram: + histogram := metric.Histogram() + hdpL := histogram.DataPoints() + for _, mg := range mf.getGroups() { + if !mg.toDistributionPoint(mf.labelKeysOrdered, &hdpL) { + mf.droppedTimeseries++ + } + } + pointCount = hdpL.Len() + + case pdata.MetricDataTypeSummary: + summary := metric.Summary() + sdpL := summary.DataPoints() + for _, mg := range mf.getGroups() { + if !mg.toSummaryPoint(mf.labelKeysOrdered, &sdpL) { + mf.droppedTimeseries++ + } + } + pointCount = sdpL.Len() + + default: + gauge := metric.Gauge() + gdpL := gauge.DataPoints() + for _, mg := range mf.getGroups() { + if !mg.toNumberDataPoint(mf.labelKeysOrdered, &gdpL) { + mf.droppedTimeseries++ + } + } + pointCount = gdpL.Len() + } + + if pointCount == 0 { + return mf.droppedTimeseries, mf.droppedTimeseries + } + + metric.CopyTo(metrics.AppendEmpty()) + + // note: the total number of points is the number of points+droppedTimeseries. + return pointCount + mf.droppedTimeseries, mf.droppedTimeseries +} diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go index 8cc9f0e2d807..6acc6770dcaa 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go @@ -15,13 +15,17 @@ package internal import ( + "fmt" + "regexp" + "sort" "strconv" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" - + "github.com/prometheus/prometheus/pkg/value" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" ) func isUsefulLabelPdata(mType pdata.MetricDataType, labelKey string) bool { @@ -76,3 +80,99 @@ func convToPdataMetricType(metricType textparse.MetricType) pdata.MetricDataType return pdata.MetricDataTypeNone } } + +type metricBuilderPdata struct { + *metricBuilder + metrics pdata.MetricSlice + currentMf MetricFamilyPdata +} + +// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus +// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object +// by calling its Build function +func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilderPdata { + var regex *regexp.Regexp + if startTimeMetricRegex != "" { + regex, _ = regexp.Compile(startTimeMetricRegex) + } + return &metricBuilderPdata{ + metrics: pdata.NewMetricSlice(), + metricBuilder: &metricBuilder{ + mc: mc, + logger: logger, + numTimeseries: 0, + droppedTimeseries: 0, + useStartTimeMetric: useStartTimeMetric, + startTimeMetricRegex: regex, + stalenessStore: stalenessStore, + }, + } +} + +// AddDataPoint is for feeding prometheus data complexValue in its processing order +func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr error) { + // Any datapoint with duplicate labels MUST be rejected per: + // * https://github.com/open-telemetry/wg-prometheus/issues/44 + // * https://github.com/open-telemetry/opentelemetry-collector/issues/3407 + // as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13. + seen := make(map[string]bool) + dupLabels := make([]string, 0, len(ls)) + for _, label := range ls { + if _, ok := seen[label.Name]; ok { + dupLabels = append(dupLabels, label.Name) + } + seen[label.Name] = true + } + if len(dupLabels) != 0 { + sort.Strings(dupLabels) + return fmt.Errorf("invalid sample: non-unique label names: %q", dupLabels) + } + + defer func() { + // Only mark this data point as in the current scrape + // iff it isn't a stale metric. + if rerr == nil && !value.IsStaleNaN(v) { + b.stalenessStore.markAsCurrentlySeen(ls, t) + } + }() + + metricName := ls.Get(model.MetricNameLabel) + switch { + case metricName == "": + b.numTimeseries++ + b.droppedTimeseries++ + return errMetricNameNotFound + case isInternalMetric(metricName): + b.hasInternalMetric = true + lm := ls.Map() + // See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series + // up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed. + if metricName == scrapeUpMetricName && v != 1.0 { + if v == 0.0 { + b.logger.Warn("Failed to scrape Prometheus endpoint", + zap.Int64("scrape_timestamp", t), + zap.String("target_labels", fmt.Sprintf("%v", lm))) + } else { + b.logger.Warn("The 'up' metric contains invalid value", + zap.Float64("value", v), + zap.Int64("scrape_timestamp", t), + zap.String("target_labels", fmt.Sprintf("%v", lm))) + } + } + case b.useStartTimeMetric && b.matchStartTimeMetric(metricName): + b.startTime = v + } + + b.hasData = true + + if b.currentMf != nil && !b.currentMf.IsSameFamily(metricName) { + ts, dts := b.currentMf.ToMetricPdata(&b.metrics) + b.numTimeseries += ts + b.droppedTimeseries += dts + b.currentMf = newMetricFamilyPdata(metricName, b.mc) + } else if b.currentMf == nil { + b.currentMf = newMetricFamilyPdata(metricName, b.mc) + } + + return b.currentMf.Add(metricName, ls, t, v) +}