Skip to content

Commit

Permalink
Cherrypick #5062 to 0.30.x: Use staleness markers generated by promet…
Browse files Browse the repository at this point in the history
…heus, rather than making our own (#3991)

* Revert "receiver/prometheus: glue and complete staleness marking for disappearing metrics (#3423)"

This reverts commit 8b79380.

* Revert "receiver/prometheus: add store to track stale metrics (#3414)"

This reverts commit cdc1634.

* stop dropping staleness markers from prometheus, and fix tests

* add staleness end to end test from #3423

* fix import grouping
  • Loading branch information
dashpole authored Sep 9, 2021
1 parent 7983af7 commit 0741d82
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 257 deletions.
15 changes: 2 additions & 13 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -59,13 +58,12 @@ type metricBuilder struct {
startTime float64
logger *zap.Logger
currentMf MetricFamily
stalenessStore *stalenessStore
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilder {
func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger) *metricBuilder {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
Expand All @@ -78,7 +76,6 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
}
}

Expand All @@ -91,7 +88,7 @@ func (b *metricBuilder) matchStartTimeMetric(metricName string) bool {
}

// AddDataPoint is for feeding prometheus data complexValue in its processing order
func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr error) {
func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error {
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
Expand All @@ -109,14 +106,6 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr
return fmt.Errorf("invalid sample: non-unique label names: %q", dupLabels)
}

defer func() {
// Only mark this data point as in the current scrape
// iff it isn't a stale metric.
if rerr == nil && !value.IsStaleNaN(v) {
b.stalenessStore.markAsCurrentlySeen(ls, t)
}
}()

metricName := ls.Get(model.MetricNameLabel)
switch {
case metricName == "":
Expand Down
12 changes: 6 additions & 6 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
mc := newMockMetadataCache(testMetadata)
st := startTs
for i, page := range tt.inputs {
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand All @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
st := startTs
for _, page := range tt.inputs {
b := newMetricBuilder(mc, true, startTimeMetricRegex,
testLogger, dummyStalenessStore())
testLogger)
b.startTime = defaultBuilderStartTime // set to a non-zero value
for _, pt := range page.pts {
// set ts for testing
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) {
func Test_metricBuilder_baddata(t *testing.T) {
t.Run("empty-metric-name", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound {
t.Error("expecting errMetricNameNotFound error, but get nil")
Expand All @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand All @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) {

t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
b := newMetricBuilder(mc, true, "", testLogger)
b.startTime = 1.0 // set to a non-zero value
if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel {
t.Error("expecting errEmptyBoundaryLabel error, but get nil")
Expand Down Expand Up @@ -1452,7 +1452,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) {
// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) {
mc := newMockMetadataCache(testMetadata)
mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore())
mb := newMetricBuilder(mc, true, "", testLogger)

dupLabels := labels.Labels{
{Name: "__name__", Value: "test"},
Expand Down
8 changes: 1 addition & 7 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ type OcaStore struct {
receiverID config.ComponentID
externalLabels labels.Labels

logger *zap.Logger
stalenessStore *stalenessStore
logger *zap.Logger
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
Expand All @@ -75,7 +74,6 @@ func NewOcaStore(
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
externalLabels: externalLabels,
stalenessStore: newStalenessStore(),
}
}

Expand All @@ -90,9 +88,6 @@ func (o *OcaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
func (o *OcaStore) Appender(context.Context) storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
// Firstly prepare the stalenessStore for a new scrape cyle.
o.stalenessStore.refresh()

return newTransaction(
o.ctx,
o.jobsMap,
Expand All @@ -103,7 +98,6 @@ func (o *OcaStore) Appender(context.Context) storage.Appender {
o.sink,
o.externalLabels,
o.logger,
o.stalenessStore,
)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,12 @@ receivers:
scrape_interval: 2ms
static_configs:
- targets: [%q]
processors:
batch:
exporters:
prometheusremotewrite:
endpoint: %q
insecure: true
service:
pipelines:
metrics:
Expand Down
118 changes: 0 additions & 118 deletions receiver/prometheusreceiver/internal/staleness_store.go

This file was deleted.

58 changes: 0 additions & 58 deletions receiver/prometheusreceiver/internal/staleness_store_test.go

This file was deleted.

Loading

0 comments on commit 0741d82

Please sign in to comment.