Skip to content

Commit

Permalink
NHCB scrape: refactor state handling and speed up scrape test (promet…
Browse files Browse the repository at this point in the history
…heus#15193)

* NHCB: scrape use state field and not booleans

From comment prometheus#14978 (comment)

Also make compareLabels read only and move storeLabels to the first
processed classic histogram series.

Signed-off-by: György Krajcsovits <[email protected]>

* Speed up TestConvertClassicHistogramsToNHCB 3x

Reduce the startup time and timeouts

Signed-off-by: György Krajcsovits <[email protected]>

* lint fix

Signed-off-by: György Krajcsovits <[email protected]>

---------

Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama authored Oct 22, 2024
1 parent 3bb5e28 commit aa81210
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 38 deletions.
69 changes: 34 additions & 35 deletions model/textparse/nhcbparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ import (
"github.com/prometheus/prometheus/util/convertnhcb"
)

type collectionState int

const (
stateStart collectionState = iota
stateCollecting
stateEmitting
)

// The NHCBParser wraps a Parser and converts classic histograms to native
// histograms with custom buckets.
//
Expand All @@ -48,6 +56,9 @@ type NHCBParser struct {
// Labels builder.
builder labels.ScratchBuilder

// State of the parser.
state collectionState

// Caches the values from the underlying parser.
// For Series and Histogram.
bytes []byte
Expand All @@ -64,9 +75,9 @@ type NHCBParser struct {

// Caches the entry itself if we are inserting a converted NHCB
// halfway through.
entry Entry
err error
justInsertedNHCB bool
entry Entry
err error

// Caches the values and metric for the inserted converted NHCB.
bytesNHCB []byte
hNHCB *histogram.Histogram
Expand All @@ -77,11 +88,10 @@ type NHCBParser struct {

// Collates values from the classic histogram series to build
// the converted histogram later.
tempLsetNHCB labels.Labels
tempNHCB convertnhcb.TempHistogram
tempExemplars []exemplar.Exemplar
tempExemplarCount int
isCollationInProgress bool
tempLsetNHCB labels.Labels
tempNHCB convertnhcb.TempHistogram
tempExemplars []exemplar.Exemplar
tempExemplarCount int

// Remembers the last base histogram metric name (assuming it's
// a classic histogram) so we can tell if the next float series
Expand All @@ -105,7 +115,7 @@ func (p *NHCBParser) Series() ([]byte, *int64, float64) {
}

func (p *NHCBParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
if p.justInsertedNHCB {
if p.state == stateEmitting {
return p.bytesNHCB, p.ts, p.hNHCB, p.fhNHCB
}
return p.bytes, p.ts, p.h, p.fh
Expand All @@ -128,7 +138,7 @@ func (p *NHCBParser) Comment() []byte {
}

func (p *NHCBParser) Metric(l *labels.Labels) string {
if p.justInsertedNHCB {
if p.state == stateEmitting {
*l = p.lsetNHCB
return p.metricStringNHCB
}
Expand All @@ -137,7 +147,7 @@ func (p *NHCBParser) Metric(l *labels.Labels) string {
}

func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool {
if p.justInsertedNHCB {
if p.state == stateEmitting {
if len(p.exemplars) == 0 {
return false
}
Expand All @@ -153,8 +163,8 @@ func (p *NHCBParser) CreatedTimestamp() *int64 {
}

func (p *NHCBParser) Next() (Entry, error) {
if p.justInsertedNHCB {
p.justInsertedNHCB = false
if p.state == stateEmitting {
p.state = stateStart
if p.entry == EntrySeries {
isNHCB := p.handleClassicHistogramSeries(p.lset)
if isNHCB && !p.keepClassicHistograms {
Expand Down Expand Up @@ -202,34 +212,21 @@ func (p *NHCBParser) Next() (Entry, error) {
}

// Return true if labels have changed and we should emit the NHCB.
// Update the stored labels if the labels have changed.
func (p *NHCBParser) compareLabels() bool {
// Collection not in progress.
if p.lastHistogramName == "" {
if p.typ == model.MetricTypeHistogram {
p.storeBaseLabels()
}
if p.state != stateCollecting {
return false
}
if p.typ != model.MetricTypeHistogram {
// Different metric type, emit the NHCB.
p.lastHistogramName = ""
// Different metric type.
return true
}

if p.lastHistogramName != convertnhcb.GetHistogramMetricBaseName(p.lset.Get(labels.MetricName)) {
// Different metric name.
p.storeBaseLabels()
return true
}
nextHash, _ := p.lset.HashWithoutLabels(p.hBuffer, labels.BucketLabel)
if p.lastHistogramLabelsHash != nextHash {
// Different label values.
p.storeBaseLabels()
return true
}

return false
// Different label values.
return p.lastHistogramLabelsHash != nextHash
}

// Save the label set of the classic histogram without suffix and bucket `le` label.
Expand Down Expand Up @@ -275,7 +272,10 @@ func (p *NHCBParser) handleClassicHistogramSeries(lset labels.Labels) bool {
}

func (p *NHCBParser) processClassicHistogramSeries(lset labels.Labels, suffix string, updateHist func(*convertnhcb.TempHistogram)) {
p.isCollationInProgress = true
if p.state != stateCollecting {
p.storeBaseLabels()
}
p.state = stateCollecting
p.tempLsetNHCB = convertnhcb.GetHistogramMetricBase(lset, suffix)
p.storeExemplars()
updateHist(&p.tempNHCB)
Expand Down Expand Up @@ -308,9 +308,9 @@ func (p *NHCBParser) swapExemplars() {
}

// processNHCB converts the collated classic histogram series to NHCB and caches the info
// to be returned to callers.
// to be returned to callers. Retruns true if the conversion was successful.
func (p *NHCBParser) processNHCB() bool {
if !p.isCollationInProgress {
if p.state != stateCollecting {
return false
}
ub := make([]float64, 0, len(p.tempNHCB.BucketCounts))
Expand Down Expand Up @@ -338,7 +338,6 @@ func (p *NHCBParser) processNHCB() bool {
p.lsetNHCB = p.tempLsetNHCB
p.swapExemplars()
p.tempNHCB = convertnhcb.NewTempHistogram()
p.isCollationInProgress = false
p.justInsertedNHCB = true
p.state = stateEmitting
return true
}
6 changes: 3 additions & 3 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3891,8 +3891,8 @@ metric: <
JobName: "test",
SampleLimit: 100,
Scheme: "http",
ScrapeInterval: model.Duration(100 * time.Millisecond),
ScrapeTimeout: model.Duration(100 * time.Millisecond),
ScrapeInterval: model.Duration(50 * time.Millisecond),
ScrapeTimeout: model.Duration(25 * time.Millisecond),
AlwaysScrapeClassicHistograms: tc.alwaysScrapeClassicHistograms,
ConvertClassicHistogramsToNHCB: tc.convertClassicHistToNHCB,
}
Expand Down Expand Up @@ -3931,7 +3931,7 @@ metric: <
}))
defer ts.Close()

sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t))
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()

Expand Down

0 comments on commit aa81210

Please sign in to comment.