Skip to content

Commit

Permalink
Cherrypick #5062 to 0.33.x: Use staleness markers generated by promet…
Browse files Browse the repository at this point in the history
…heus, rather than making our own (#3989)

* Revert "receiver/prometheus: glue and complete staleness marking for disappearing metrics (#3423)"

This reverts commit 8b79380.

* Revert "receiver/prometheus: add store to track stale metrics (#3414)"

This reverts commit cdc1634.

* stop dropping staleness markers from prometheus, and fix tests

* add staleness end to end test from #3423

* fix import grouping
  • Loading branch information
dashpole authored Sep 9, 2021
1 parent 8739b2b commit f81fd47
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 261 deletions.
15 changes: 2 additions & 13 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -60,13 +59,12 @@ type metricBuilder struct {
intervalStartTimeMs int64
logger *zap.Logger
currentMf MetricFamily
stalenessStore *stalenessStore
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore, intervalStartTimeMs int64) *metricBuilder {
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, intervalStartTimeMs int64) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
Expand All @@ -79,7 +77,6 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
intervalStartTimeMs: intervalStartTimeMs,
}
}
Expand All @@ -93,7 +90,7 @@ func (b *metricBuilder) matchStartTimeMetric(metricName string) bool {
}

// AddDataPoint is for feeding prometheus data complexValue in its processing order
func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr error) {
func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error {
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
Expand All @@ -111,14 +108,6 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr
return fmt.Errorf("invalid sample: non-unique label names: %q", dupLabels)
}

defer func() {
// Only mark this data point as in the current scrape
// iff it isn't a stale metric.
if rerr == nil && !value.IsStaleNaN(v) {
b.stalenessStore.markAsCurrentlySeen(ls, t)
}
}()

metricName := ls.Get(model.MetricNameLabel)
switch {
case metricName == "":
Expand Down
13 changes: 6 additions & 7 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), startTs)
b := newMetricBuilder(mc, true, "", testLogger, startTs)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand All @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger, dummyStalenessStore(), 0)
testLogger, 0)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b := newMetricBuilder(mc, true, "", testLogger, 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b := newMetricBuilder(mc, true, "", testLogger, 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)
b := newMetricBuilder(mc, true, "", testLogger, 0)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down Expand Up @@ -1452,8 +1452,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) {
// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0)

mb := newMetricBuilder(mc, true, "", testLogger, 0)
dupLabels := labels.Labels{
{Name: "__name__", Value: "test"},
{Name: "a", Value: "1"},
Expand Down
8 changes: 1 addition & 7 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ type OcaStore struct {
receiverID config.ComponentID
externalLabels labels.Labels

logger *zap.Logger
stalenessStore *stalenessStore
logger *zap.Logger
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
Expand All @@ -75,7 +74,6 @@ func NewOcaStore(
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
externalLabels: externalLabels,
stalenessStore: newStalenessStore(),
}
}

Expand All @@ -90,9 +88,6 @@ func (o *OcaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
func (o *OcaStore) Appender(context.Context) storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
// Firstly prepare the stalenessStore for a new scrape cyle.
o.stalenessStore.refresh()

return newTransaction(
o.ctx,
o.jobsMap,
Expand All @@ -103,7 +98,6 @@ func (o *OcaStore) Appender(context.Context) storage.Appender {
o.sink,
o.externalLabels,
o.logger,
o.stalenessStore,
)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
Expand Down
12 changes: 1 addition & 11 deletions receiver/prometheusreceiver/internal/otlp_metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"go.uber.org/zap"

"go.opentelemetry.io/collector/model/pdata"
Expand Down Expand Up @@ -91,7 +90,7 @@ type metricBuilderPdata struct {
// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilderPdata {
func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger) *metricBuilderPdata {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
Expand All @@ -105,7 +104,6 @@ func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeM
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
},
}
}
Expand Down Expand Up @@ -133,14 +131,6 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64)
return fmt.Errorf("invalid sample: non-unique label names: %q", dupLabels)
}

defer func() {
// Only mark this data point as in the current scrape
// iff it isn't a stale metric.
if rerr == nil && !value.IsStaleNaN(v) {
b.stalenessStore.markAsCurrentlySeen(ls, t)
}
}()

metricName := ls.Get(model.MetricNameLabel)
switch {
case metricName == "":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,12 @@ receivers:
scrape_interval: 2ms
static_configs:
- targets: [%q]
processors:
batch:
exporters:
prometheusremotewrite:
endpoint: %q
insecure: true
service:
pipelines:
metrics:
Expand Down
118 changes: 0 additions & 118 deletions receiver/prometheusreceiver/internal/staleness_store.go

This file was deleted.

58 changes: 0 additions & 58 deletions receiver/prometheusreceiver/internal/staleness_store_test.go

This file was deleted.

Loading

0 comments on commit f81fd47

Please sign in to comment.