Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sidecar, query, receiver: Native histogram support #6032

Merged
merged 8 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func registerReceive(app *extkingpin.App) {
EnableExemplarStorage: true,
HeadChunksWriteQueueSize: int(conf.tsdbWriteQueueSize),
EnableMemorySnapshotOnShutdown: conf.tsdbMemorySnapshotOnShutdown,
EnableNativeHistograms: conf.tsdbEnableNativeHistograms,
}

// Are we running in IngestorOnly, RouterOnly or RouterIngestor mode?
Expand Down Expand Up @@ -783,6 +784,7 @@ type receiveConfig struct {
tsdbMaxExemplars int64
tsdbWriteQueueSize int64
tsdbMemorySnapshotOnShutdown bool
tsdbEnableNativeHistograms bool

walCompression bool
noLockFile bool
Expand Down Expand Up @@ -895,6 +897,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"[EXPERIMENTAL] Enables feature to snapshot in-memory chunks on shutdown for faster restarts.").
Default("false").Hidden().BoolVar(&rc.tsdbMemorySnapshotOnShutdown)

cmd.Flag("tsdb.enable-native-histograms",
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
"[EXPERIMENTAL] Enables the ingestion of native histograms.").
Default("false").Hidden().BoolVar(&rc.tsdbEnableNativeHistograms)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&rc.hashFunc, "SHA256", "")

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/prometheus/alertmanager v0.25.0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
github.com/prometheus/common v0.39.1-0.20230110141620-846591a16635
github.com/prometheus/exporter-toolkit v0.8.2
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.41.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,8 @@ github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9
github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI=
github.com/prometheus/common v0.39.0/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
github.com/prometheus/common v0.39.1-0.20230110141620-846591a16635 h1:hK3y58iUBjRFZ6kFNJTWsES1GnVKsqEYUeiyeRXridQ=
github.com/prometheus/common v0.39.1-0.20230110141620-846591a16635/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4=
github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI=
github.com/prometheus/exporter-toolkit v0.8.2 h1:sbJAfBXQFkG6sUkbwBun8MNdzW9+wd5YfPYofbmj0YM=
Expand Down
102 changes: 57 additions & 45 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ type adjustableSeriesIterator interface {

// adjustAtValue allows to adjust value by implementation if needed knowing the last value. This is used by counter
// implementation which can adjust for obsolete counter value.
adjustAtValue(lastValue float64)
adjustAtValue(lastFloatValue float64)
}

type noopAdjustableSeriesIterator struct {
Expand Down Expand Up @@ -360,11 +360,11 @@ type counterErrAdjustSeriesIterator struct {
errAdjust float64
}

func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastValue float64) {
func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastFloatValue float64) {
_, v := it.At()
if lastValue > v {
if lastFloatValue > v {
// This replica has obsolete value (did not see the correct "end" of counter value before app restart). Adjust.
it.errAdjust += lastValue - v
it.errAdjust += lastFloatValue - v
}
}

Expand All @@ -380,32 +380,33 @@ type dedupSeriesIterator struct {

// TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more
// responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR.
lastT int64
lastV float64
lastT int64
lastIter chunkenc.Iterator

penA, penB int64
useA bool
}

func newDedupSeriesIterator(a, b adjustableSeriesIterator) *dedupSeriesIterator {
return &dedupSeriesIterator{
a: a,
b: b,
lastT: math.MinInt64,
lastV: float64(math.MinInt64),
aval: a.Next(),
bval: b.Next(),
a: a,
b: b,
lastT: math.MinInt64,
lastIter: a,
aval: a.Next(),
bval: b.Next(),
}
}

func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
lastValue := it.lastV
lastFloatVal, isFloatVal := it.lastFloatVal()
lastUseA := it.useA
defer func() {
if it.useA != lastUseA {
if it.useA != lastUseA && isFloatVal {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjustAtValue seems to already be protected against doing something with non-float values but this makes code clearer 👍

// We switched replicas.
// Ensure values are correct bases on value before At.
it.adjustAtValue(lastValue)
// TODO(rabenhorst): Investigate if we also need to implement adjusting histograms here.
it.adjustAtValue(lastFloatVal)
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
}
}()

Expand All @@ -421,23 +422,25 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
if it.aval == chunkenc.ValNone {
it.useA = false
if it.bval != chunkenc.ValNone {
it.lastT, it.lastV = it.b.At()
it.lastT = it.b.AtT()
it.lastIter = it.b
it.penB = 0
}
return it.bval
}
if it.bval == chunkenc.ValNone {
it.useA = true
it.lastT, it.lastV = it.a.At()
it.lastT = it.a.AtT()
it.lastIter = it.a
it.penA = 0
return it.aval
}
// General case where both iterators still have data. We pick the one
// with the smaller timestamp.
// The applied penalty potentially already skipped potential samples already
// that would have resulted in exaggerated sampling frequency.
ta, va := it.a.At()
tb, vb := it.b.At()
ta := it.a.AtT()
tb := it.b.AtT()

it.useA = ta <= tb

Expand All @@ -458,7 +461,8 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
}
it.penA = 0
it.lastT = ta
it.lastV = va
it.lastIter = it.a

return it.aval
}
if it.lastT != math.MinInt64 {
Expand All @@ -468,26 +472,40 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
}
it.penB = 0
it.lastT = tb
it.lastV = vb
it.lastIter = it.b
return it.bval
}

func (it *dedupSeriesIterator) adjustAtValue(lastValue float64) {
if it.aval != chunkenc.ValNone {
it.a.adjustAtValue(lastValue)
func (it *dedupSeriesIterator) lastFloatVal() (float64, bool) {
if it.useA && it.aval == chunkenc.ValFloat {
_, v := it.lastIter.At()
return v, true
}
if it.bval != chunkenc.ValNone {
it.b.adjustAtValue(lastValue)
if !it.useA && it.bval == chunkenc.ValFloat {
_, v := it.lastIter.At()
return v, true
}
return 0, false
}

func (it *dedupSeriesIterator) adjustAtValue(lastFloatValue float64) {
if it.aval == chunkenc.ValFloat {
it.a.adjustAtValue(lastFloatValue)
}
if it.bval == chunkenc.ValFloat {
it.b.adjustAtValue(lastFloatValue)
}
}

// TODO(rabenhorst): Native histogram support needs to be implemented, float type hardcoded.
func (it *dedupSeriesIterator) Seek(t int64) chunkenc.ValueType {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for {
ts, _ := it.At()
ts := it.AtT()
if ts >= t {
return chunkenc.ValFloat
if it.useA {
return it.a.Seek(ts)
}
return it.b.Seek(ts)
}
if it.Next() == chunkenc.ValNone {
return chunkenc.ValNone
Expand All @@ -496,27 +514,23 @@ func (it *dedupSeriesIterator) Seek(t int64) chunkenc.ValueType {
}

func (it *dedupSeriesIterator) At() (int64, float64) {
if it.useA {
return it.a.At()
}
return it.b.At()
return it.lastIter.At()
}

// TODO(rabenhorst): Needs to be implemented for native histogram support.
func (it *dedupSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
panic("not implemented")
return it.lastIter.AtHistogram()
}

func (it *dedupSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
panic("not implemented")
return it.lastIter.AtFloatHistogram()
}

func (it *dedupSeriesIterator) AtT() int64 {
var t int64
if it.useA {
t, _ = it.a.At()
t = it.a.AtT()
} else {
t, _ = it.b.At()
t = it.b.AtT()
}
return t
}
Expand Down Expand Up @@ -553,33 +567,31 @@ func (it *boundedSeriesIterator) At() (t int64, v float64) {
return it.it.At()
}

// TODO(rabenhorst): Needs to be implemented for native histogram support.
func (it *boundedSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
panic("not implemented")
return it.it.AtHistogram()
}

func (it *boundedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
panic("not implemented")
return it.it.AtFloatHistogram()
}

func (it *boundedSeriesIterator) AtT() int64 {
t, _ := it.it.At()
return t
return it.it.AtT()
}

func (it *boundedSeriesIterator) Next() chunkenc.ValueType {
valueType := it.it.Next()
if valueType == chunkenc.ValNone {
return chunkenc.ValNone
}
t, _ := it.it.At()
t := it.it.AtT()

// Advance the iterator if we are before the valid interval.
if t < it.mint {
if it.Seek(it.mint) == chunkenc.ValNone {
return chunkenc.ValNone
}
t, _ = it.it.At()
t = it.it.AtT()
}
// Once we passed the valid interval, there is no going back.
if t <= it.maxt {
Expand Down
13 changes: 9 additions & 4 deletions pkg/dedup/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ var expectedRealSeriesWithStaleMarkerDeduplicatedForRate = []sample{

func TestDedupSeriesSet(t *testing.T) {
tests := []struct {
name string
input []series
exp []series
dedupLabels map[string]struct{}
isCounter bool
}{
{
// Single dedup label.
name: "Single dedup label",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}},
Expand Down Expand Up @@ -210,6 +211,7 @@ func TestDedupSeriesSet(t *testing.T) {
{
// Regression tests against: https://github.com/thanos-io/thanos/issues/2645.
// We were panicking on requests with more replica labels than overall labels in any series.
name: "Regression tests against #2645",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}},
Expand Down Expand Up @@ -238,7 +240,7 @@ func TestDedupSeriesSet(t *testing.T) {
dedupLabels: map[string]struct{}{"replica": {}, "replica2": {}, "replica3": {}, "replica4": {}, "replica5": {}, "replica6": {}, "replica7": {}},
},
{
// Multi dedup label.
name: "Multi dedup label",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}},
Expand Down Expand Up @@ -294,7 +296,7 @@ func TestDedupSeriesSet(t *testing.T) {
},
},
{
// Multi dedup label - some series don't have all dedup labels.
name: "Multi dedup label - some series don't have all dedup labels",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}},
Expand Down Expand Up @@ -322,6 +324,7 @@ func TestDedupSeriesSet(t *testing.T) {
// Now, depending on what replica we look, we can see totally different counter value in total where total means
// after accounting for counter resets. We account for that in downsample.CounterSeriesIterator, mainly because
// we handle downsample Counter Aggregations specially (for detecting resets between chunks).
name: "Regression test against 2401",
isCounter: true,
input: []series{
{
Expand Down Expand Up @@ -362,6 +365,7 @@ func TestDedupSeriesSet(t *testing.T) {
},
{
// Same thing but not for counter should not adjust anything.
name: "Regression test with no counter adjustment",
isCounter: false,
input: []series{
{
Expand All @@ -387,6 +391,7 @@ func TestDedupSeriesSet(t *testing.T) {
{
// Regression test on real data against https://github.com/thanos-io/thanos/issues/2401.
// Real data with stale marker after downsample.CounterSeriesIterator (required for downsampling + rate).
name: "Regression test on real data against 2401",
isCounter: true,
input: []series{
{
Expand Down Expand Up @@ -456,7 +461,7 @@ func TestDedupSeriesSet(t *testing.T) {
}

for _, tcase := range tests {
t.Run("", func(t *testing.T) {
t.Run(tcase.name, func(t *testing.T) {
// If it is a counter then pass a function which expects a counter.
f := ""
if tcase.isCounter {
Expand Down
5 changes: 2 additions & 3 deletions pkg/dedup/pushdown_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,13 @@ func (it *pushdownSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHis
}

func (it *pushdownSeriesIterator) AtT() int64 {
t, _ := it.a.At()
t := it.a.AtT()
return t
}

// TODO(rabenhorst): Native histogram support needs to be implemented, currently float type is hardcoded.
func (it *pushdownSeriesIterator) Seek(t int64) chunkenc.ValueType {
for {
ts, _ := it.At()
ts := it.AtT()
if ts >= t {
return chunkenc.ValFloat
}
Expand Down
Loading