Skip to content

Commit

Permalink
feat(nhcb): implement created timestamp handling (prometheus#15198)
Browse files Browse the repository at this point in the history
Call through to the underlaying parser if we are not in a histogram
and the entry is a series or exponential native histogram. Otherwise store
and retrieve CT for NHCB.

* fix(omparser): losing exemplars when CT is parsed

Fixes: prometheus#15137
Ignore exemplars while peeking ahead during CT parsing.
Simplify state reset with defer().

Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama authored Oct 24, 2024
1 parent cccbe72 commit 2182b83
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 44 deletions.
6 changes: 3 additions & 3 deletions model/textparse/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,13 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) {
}

p.Metric(&got.lset)
for e := (exemplar.Exemplar{}); p.Exemplar(&e); {
got.es = append(got.es, e)
}
// Parser reuses int pointer.
if ct := p.CreatedTimestamp(); ct != nil {
got.ct = int64p(*ct)
}
for e := (exemplar.Exemplar{}); p.Exemplar(&e); {
got.es = append(got.es, e)
}
case EntryType:
m, got.typ = p.Type()
got.m = string(m)
Expand Down
36 changes: 24 additions & 12 deletions model/textparse/nhcbparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type NHCBParser struct {
fhNHCB *histogram.FloatHistogram
lsetNHCB labels.Labels
exemplars []exemplar.Exemplar
ctNHCB *int64
metricStringNHCB string

// Collates values from the classic histogram series to build
Expand All @@ -92,6 +93,7 @@ type NHCBParser struct {
tempNHCB convertnhcb.TempHistogram
tempExemplars []exemplar.Exemplar
tempExemplarCount int
tempCT *int64

// Remembers the last base histogram metric name (assuming it's
// a classic histogram) so we can tell if the next float series
Expand Down Expand Up @@ -159,6 +161,16 @@ func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool {
}

func (p *NHCBParser) CreatedTimestamp() *int64 {
switch p.state {
case stateStart:
if p.entry == EntrySeries || p.entry == EntryHistogram {
return p.parser.CreatedTimestamp()
}
case stateCollecting:
return p.parser.CreatedTimestamp()
case stateEmitting:
return p.ctNHCB
}
return nil
}

Expand All @@ -174,41 +186,38 @@ func (p *NHCBParser) Next() (Entry, error) {
}
return p.entry, p.err
}
et, err := p.parser.Next()
if err != nil {
if errors.Is(err, io.EOF) && p.processNHCB() {
p.entry = et
p.err = err

p.entry, p.err = p.parser.Next()
if p.err != nil {
if errors.Is(p.err, io.EOF) && p.processNHCB() {
return EntryHistogram, nil
}
return EntryInvalid, err
return EntryInvalid, p.err
}
switch et {
switch p.entry {
case EntrySeries:
p.bytes, p.ts, p.value = p.parser.Series()
p.metricString = p.parser.Metric(&p.lset)
// Check the label set to see if we can continue or need to emit the NHCB.
if p.compareLabels() && p.processNHCB() {
p.entry = et
return EntryHistogram, nil
}
isNHCB := p.handleClassicHistogramSeries(p.lset)
if isNHCB && !p.keepClassicHistograms {
// Do not return the classic histogram series if it was converted to NHCB and we are not keeping classic histograms.
return p.Next()
}
return et, err
return p.entry, p.err
case EntryHistogram:
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
p.metricString = p.parser.Metric(&p.lset)
case EntryType:
p.bName, p.typ = p.parser.Type()
}
if p.processNHCB() {
p.entry = et
return EntryHistogram, nil
}
return et, err
return p.entry, p.err
}

// Return true if labels have changed and we should emit the NHCB.
Expand Down Expand Up @@ -274,8 +283,9 @@ func (p *NHCBParser) handleClassicHistogramSeries(lset labels.Labels) bool {
func (p *NHCBParser) processClassicHistogramSeries(lset labels.Labels, suffix string, updateHist func(*convertnhcb.TempHistogram)) {
if p.state != stateCollecting {
p.storeBaseLabels()
p.tempCT = p.parser.CreatedTimestamp()
p.state = stateCollecting
}
p.state = stateCollecting
p.tempLsetNHCB = convertnhcb.GetHistogramMetricBase(lset, suffix)
p.storeExemplars()
updateHist(&p.tempNHCB)
Expand Down Expand Up @@ -337,7 +347,9 @@ func (p *NHCBParser) processNHCB() bool {
p.bytesNHCB = []byte(p.metricStringNHCB)
p.lsetNHCB = p.tempLsetNHCB
p.swapExemplars()
p.ctNHCB = p.tempCT
p.tempNHCB = convertnhcb.NewTempHistogram()
p.state = stateEmitting
p.tempCT = nil
return true
}
46 changes: 29 additions & 17 deletions model/textparse/nhcbparse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ foobar{quantile="0.99"} 150.1`
lset: labels.FromStrings("__name__", "foo_total"),
t: int64p(1520879607789),
es: []exemplar.Exemplar{{Labels: labels.FromStrings("id", "counter-test"), Value: 5}},
// TODO(krajorama): ct: int64p(1520872607123),
ct: int64p(1520872607123),
}, {
m: `foo_total{a="b"}`,
v: 17.0,
lset: labels.FromStrings("__name__", "foo_total", "a", "b"),
t: int64p(1520879607789),
es: []exemplar.Exemplar{{Labels: labels.FromStrings("id", "counter-test"), Value: 5}},
// TODO(krajorama): ct: int64p(1520872607123),
ct: int64p(1520872607123),
}, {
m: "bar",
help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far",
Expand All @@ -310,22 +310,22 @@ foobar{quantile="0.99"} 150.1`
m: "bar_count",
v: 17.0,
lset: labels.FromStrings("__name__", "bar_count"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: "bar_sum",
v: 324789.3,
lset: labels.FromStrings("__name__", "bar_sum"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: `bar{quantile="0.95"}`,
v: 123.7,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: `bar{quantile="0.99"}`,
v: 150.0,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: "baz",
help: "Histogram with the same objective as above's summary",
Expand All @@ -343,7 +343,7 @@ foobar{quantile="0.99"} 150.1`
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "baz"),
// TODO(krajorama): ct: int64p(1520872609125),
ct: int64p(1520872609125),
}, {
m: "fizz_created",
help: "Gauge which shouldn't be parsed as CT",
Expand Down Expand Up @@ -371,7 +371,7 @@ foobar{quantile="0.99"} 150.1`
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "something"),
// TODO(krajorama): ct: int64p(1520430001000),
ct: int64p(1520430001000),
}, {
m: `something{a="b"}`,
shs: &histogram.Histogram{
Expand All @@ -383,7 +383,7 @@ foobar{quantile="0.99"} 150.1`
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "something", "a", "b"),
// TODO(krajorama): ct: int64p(1520430002000),
ct: int64p(1520430002000),
}, {
m: "yum",
help: "Summary with _created between sum and quantiles",
Expand All @@ -394,22 +394,22 @@ foobar{quantile="0.99"} 150.1`
m: `yum_count`,
v: 20,
lset: labels.FromStrings("__name__", "yum_count"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: `yum_sum`,
v: 324789.5,
lset: labels.FromStrings("__name__", "yum_sum"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: `yum{quantile="0.95"}`,
v: 123.7,
lset: labels.FromStrings("__name__", "yum", "quantile", "0.95"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: `yum{quantile="0.99"}`,
v: 150.0,
lset: labels.FromStrings("__name__", "yum", "quantile", "0.99"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: "foobar",
help: "Summary with _created as the first line",
Expand All @@ -420,22 +420,22 @@ foobar{quantile="0.99"} 150.1`
m: `foobar_count`,
v: 21,
lset: labels.FromStrings("__name__", "foobar_count"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: `foobar_sum`,
v: 324789.6,
lset: labels.FromStrings("__name__", "foobar_sum"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: `foobar{quantile="0.95"}`,
v: 123.8,
lset: labels.FromStrings("__name__", "foobar", "quantile", "0.95"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: `foobar{quantile="0.99"}`,
v: 150.1,
lset: labels.FromStrings("__name__", "foobar", "quantile", "0.99"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: "metric",
help: "foo\x00bar",
Expand Down Expand Up @@ -555,42 +555,49 @@ func TestNHCBParserProtoBufParser_NoNHCBWhenExponential(t *testing.T) {
},
lset: labels.FromStrings("__name__", "test_histogram"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_count",
v: 175,
lset: labels.FromStrings("__name__", "test_histogram_count"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_sum",
v: 0.0008280461746287094,
lset: labels.FromStrings("__name__", "test_histogram_sum"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff-0.0004899999999999998",
v: 2,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0004899999999999998"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff-0.0003899999999999998",
v: 4,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0003899999999999998"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff-0.0002899999999999998",
v: 16,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0002899999999999998"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff+Inf",
v: 175,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"),
t: int64p(1234568),
ct: int64p(1000),
},
{
// TODO(krajorama): optimize: this should not be here. In case there's
Expand All @@ -609,6 +616,7 @@ func TestNHCBParserProtoBufParser_NoNHCBWhenExponential(t *testing.T) {
},
lset: labels.FromStrings("__name__", "test_histogram"),
t: int64p(1234568),
ct: int64p(1000),
},
}
got := testParse(t, p)
Expand All @@ -621,6 +629,10 @@ help: "Test histogram with classic and exponential buckets."
type: HISTOGRAM
metric: <
histogram: <
created_timestamp: <
seconds: 1
nanos: 1
>
sample_count: 175
sample_sum: 0.0008280461746287094
bucket: <
Expand Down
Loading

0 comments on commit 2182b83

Please sign in to comment.