Skip to content

Commit

Permalink
sidecar, query, receiver: Native histogram support (thanos-io#6032)
Browse files Browse the repository at this point in the history
* Added native histogram support for sidecar, query, receiver.

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Added comment

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Cleanup native histogram tests

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Made native hist test ha and replace usage of at with att

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Fix deduplication with native histograms

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Enabled native histogram writing

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Committed missing files for write

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Added benchmark for histogram writing

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Fixed chunkSeriesIterator

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Enabled replication for native histogram write test

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Lint native histogram test imports

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Removed ToDo comments

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Cleanup

Signed-off-by: Sebastian Rabenhorst <[email protected]>

* Renamed lastValue of dedup iter and added TODO

Signed-off-by: Sebastian Rabenhorst <[email protected]>

* Fixed typo

Signed-off-by: Sebastian Rabenhorst <[email protected]>

* Added hidden native histogram flag for receive tsdb

Signed-off-by: Sebastian Rabenhorst <[email protected]>

* merge

Signed-off-by: Sebastian Rabenhorst <[email protected]>

* Comments and reverted change to qfe

Signed-off-by: Sebastian Rabenhorst <[email protected]>

* Go mod tidy

Signed-off-by: Sebastian Rabenhorst <[email protected]>

* Dedup iter nit

Signed-off-by: Sebastian Rabenhorst <[email protected]>

Signed-off-by: Sebastian Rabenhorst <[email protected]>
  • Loading branch information
rabenhorst authored and Nathaniel Graham committed Apr 17, 2023
1 parent 96a4dcd commit 2f78a14
Show file tree
Hide file tree
Showing 16 changed files with 2,050 additions and 354 deletions.
6 changes: 6 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func registerReceive(app *extkingpin.App) {
EnableExemplarStorage: true,
HeadChunksWriteQueueSize: int(conf.tsdbWriteQueueSize),
EnableMemorySnapshotOnShutdown: conf.tsdbMemorySnapshotOnShutdown,
EnableNativeHistograms: conf.tsdbEnableNativeHistograms,
}

// Are we running in IngestorOnly, RouterOnly or RouterIngestor mode?
Expand Down Expand Up @@ -783,6 +784,7 @@ type receiveConfig struct {
tsdbMaxExemplars int64
tsdbWriteQueueSize int64
tsdbMemorySnapshotOnShutdown bool
tsdbEnableNativeHistograms bool

walCompression bool
noLockFile bool
Expand Down Expand Up @@ -895,6 +897,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"[EXPERIMENTAL] Enables feature to snapshot in-memory chunks on shutdown for faster restarts.").
Default("false").Hidden().BoolVar(&rc.tsdbMemorySnapshotOnShutdown)

cmd.Flag("tsdb.enable-native-histograms",
"[EXPERIMENTAL] Enables the ingestion of native histograms.").
Default("false").Hidden().BoolVar(&rc.tsdbEnableNativeHistograms)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&rc.hashFunc, "SHA256", "")

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/prometheus/alertmanager v0.25.0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
github.com/prometheus/common v0.39.1-0.20230110141620-846591a16635
github.com/prometheus/exporter-toolkit v0.8.2
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.41.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,8 @@ github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9
github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI=
github.com/prometheus/common v0.39.0/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
github.com/prometheus/common v0.39.1-0.20230110141620-846591a16635 h1:hK3y58iUBjRFZ6kFNJTWsES1GnVKsqEYUeiyeRXridQ=
github.com/prometheus/common v0.39.1-0.20230110141620-846591a16635/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4=
github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI=
github.com/prometheus/exporter-toolkit v0.8.2 h1:sbJAfBXQFkG6sUkbwBun8MNdzW9+wd5YfPYofbmj0YM=
Expand Down
102 changes: 57 additions & 45 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ type adjustableSeriesIterator interface {

// adjustAtValue allows to adjust value by implementation if needed knowing the last value. This is used by counter
// implementation which can adjust for obsolete counter value.
adjustAtValue(lastValue float64)
adjustAtValue(lastFloatValue float64)
}

type noopAdjustableSeriesIterator struct {
Expand Down Expand Up @@ -360,11 +360,11 @@ type counterErrAdjustSeriesIterator struct {
errAdjust float64
}

func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastValue float64) {
func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastFloatValue float64) {
_, v := it.At()
if lastValue > v {
if lastFloatValue > v {
// This replica has obsolete value (did not see the correct "end" of counter value before app restart). Adjust.
it.errAdjust += lastValue - v
it.errAdjust += lastFloatValue - v
}
}

Expand All @@ -380,32 +380,33 @@ type dedupSeriesIterator struct {

// TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more
// responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR.
lastT int64
lastV float64
lastT int64
lastIter chunkenc.Iterator

penA, penB int64
useA bool
}

func newDedupSeriesIterator(a, b adjustableSeriesIterator) *dedupSeriesIterator {
return &dedupSeriesIterator{
a: a,
b: b,
lastT: math.MinInt64,
lastV: float64(math.MinInt64),
aval: a.Next(),
bval: b.Next(),
a: a,
b: b,
lastT: math.MinInt64,
lastIter: a,
aval: a.Next(),
bval: b.Next(),
}
}

func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
lastValue := it.lastV
lastFloatVal, isFloatVal := it.lastFloatVal()
lastUseA := it.useA
defer func() {
if it.useA != lastUseA {
if it.useA != lastUseA && isFloatVal {
// We switched replicas.
// Ensure values are correct bases on value before At.
it.adjustAtValue(lastValue)
// TODO(rabenhorst): Investigate if we also need to implement adjusting histograms here.
it.adjustAtValue(lastFloatVal)
}
}()

Expand All @@ -421,23 +422,25 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
if it.aval == chunkenc.ValNone {
it.useA = false
if it.bval != chunkenc.ValNone {
it.lastT, it.lastV = it.b.At()
it.lastT = it.b.AtT()
it.lastIter = it.b
it.penB = 0
}
return it.bval
}
if it.bval == chunkenc.ValNone {
it.useA = true
it.lastT, it.lastV = it.a.At()
it.lastT = it.a.AtT()
it.lastIter = it.a
it.penA = 0
return it.aval
}
// General case where both iterators still have data. We pick the one
// with the smaller timestamp.
// The applied penalty potentially already skipped potential samples already
// that would have resulted in exaggerated sampling frequency.
ta, va := it.a.At()
tb, vb := it.b.At()
ta := it.a.AtT()
tb := it.b.AtT()

it.useA = ta <= tb

Expand All @@ -458,7 +461,8 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
}
it.penA = 0
it.lastT = ta
it.lastV = va
it.lastIter = it.a

return it.aval
}
if it.lastT != math.MinInt64 {
Expand All @@ -468,26 +472,40 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
}
it.penB = 0
it.lastT = tb
it.lastV = vb
it.lastIter = it.b
return it.bval
}

func (it *dedupSeriesIterator) adjustAtValue(lastValue float64) {
if it.aval != chunkenc.ValNone {
it.a.adjustAtValue(lastValue)
func (it *dedupSeriesIterator) lastFloatVal() (float64, bool) {
if it.useA && it.aval == chunkenc.ValFloat {
_, v := it.lastIter.At()
return v, true
}
if it.bval != chunkenc.ValNone {
it.b.adjustAtValue(lastValue)
if !it.useA && it.bval == chunkenc.ValFloat {
_, v := it.lastIter.At()
return v, true
}
return 0, false
}

func (it *dedupSeriesIterator) adjustAtValue(lastFloatValue float64) {
if it.aval == chunkenc.ValFloat {
it.a.adjustAtValue(lastFloatValue)
}
if it.bval == chunkenc.ValFloat {
it.b.adjustAtValue(lastFloatValue)
}
}

// TODO(rabenhorst): Native histogram support needs to be implemented, float type hardcoded.
func (it *dedupSeriesIterator) Seek(t int64) chunkenc.ValueType {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for {
ts, _ := it.At()
ts := it.AtT()
if ts >= t {
return chunkenc.ValFloat
if it.useA {
return it.a.Seek(ts)
}
return it.b.Seek(ts)
}
if it.Next() == chunkenc.ValNone {
return chunkenc.ValNone
Expand All @@ -496,27 +514,23 @@ func (it *dedupSeriesIterator) Seek(t int64) chunkenc.ValueType {
}

func (it *dedupSeriesIterator) At() (int64, float64) {
if it.useA {
return it.a.At()
}
return it.b.At()
return it.lastIter.At()
}

// TODO(rabenhorst): Needs to be implemented for native histogram support.
func (it *dedupSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
panic("not implemented")
return it.lastIter.AtHistogram()
}

func (it *dedupSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
panic("not implemented")
return it.lastIter.AtFloatHistogram()
}

func (it *dedupSeriesIterator) AtT() int64 {
var t int64
if it.useA {
t, _ = it.a.At()
t = it.a.AtT()
} else {
t, _ = it.b.At()
t = it.b.AtT()
}
return t
}
Expand Down Expand Up @@ -553,33 +567,31 @@ func (it *boundedSeriesIterator) At() (t int64, v float64) {
return it.it.At()
}

// TODO(rabenhorst): Needs to be implemented for native histogram support.
func (it *boundedSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
panic("not implemented")
return it.it.AtHistogram()
}

func (it *boundedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
panic("not implemented")
return it.it.AtFloatHistogram()
}

func (it *boundedSeriesIterator) AtT() int64 {
t, _ := it.it.At()
return t
return it.it.AtT()
}

func (it *boundedSeriesIterator) Next() chunkenc.ValueType {
valueType := it.it.Next()
if valueType == chunkenc.ValNone {
return chunkenc.ValNone
}
t, _ := it.it.At()
t := it.it.AtT()

// Advance the iterator if we are before the valid interval.
if t < it.mint {
if it.Seek(it.mint) == chunkenc.ValNone {
return chunkenc.ValNone
}
t, _ = it.it.At()
t = it.it.AtT()
}
// Once we passed the valid interval, there is no going back.
if t <= it.maxt {
Expand Down
13 changes: 9 additions & 4 deletions pkg/dedup/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ var expectedRealSeriesWithStaleMarkerDeduplicatedForRate = []sample{

func TestDedupSeriesSet(t *testing.T) {
tests := []struct {
name string
input []series
exp []series
dedupLabels map[string]struct{}
isCounter bool
}{
{
// Single dedup label.
name: "Single dedup label",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}},
Expand Down Expand Up @@ -210,6 +211,7 @@ func TestDedupSeriesSet(t *testing.T) {
{
// Regression tests against: https://github.com/thanos-io/thanos/issues/2645.
// We were panicking on requests with more replica labels than overall labels in any series.
name: "Regression tests against #2645",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}},
Expand Down Expand Up @@ -238,7 +240,7 @@ func TestDedupSeriesSet(t *testing.T) {
dedupLabels: map[string]struct{}{"replica": {}, "replica2": {}, "replica3": {}, "replica4": {}, "replica5": {}, "replica6": {}, "replica7": {}},
},
{
// Multi dedup label.
name: "Multi dedup label",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}},
Expand Down Expand Up @@ -294,7 +296,7 @@ func TestDedupSeriesSet(t *testing.T) {
},
},
{
// Multi dedup label - some series don't have all dedup labels.
name: "Multi dedup label - some series don't have all dedup labels",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}},
Expand Down Expand Up @@ -322,6 +324,7 @@ func TestDedupSeriesSet(t *testing.T) {
// Now, depending on what replica we look, we can see totally different counter value in total where total means
// after accounting for counter resets. We account for that in downsample.CounterSeriesIterator, mainly because
// we handle downsample Counter Aggregations specially (for detecting resets between chunks).
name: "Regression test against 2401",
isCounter: true,
input: []series{
{
Expand Down Expand Up @@ -362,6 +365,7 @@ func TestDedupSeriesSet(t *testing.T) {
},
{
// Same thing but not for counter should not adjust anything.
name: "Regression test with no counter adjustment",
isCounter: false,
input: []series{
{
Expand All @@ -387,6 +391,7 @@ func TestDedupSeriesSet(t *testing.T) {
{
// Regression test on real data against https://github.com/thanos-io/thanos/issues/2401.
// Real data with stale marker after downsample.CounterSeriesIterator (required for downsampling + rate).
name: "Regression test on real data against 2401",
isCounter: true,
input: []series{
{
Expand Down Expand Up @@ -456,7 +461,7 @@ func TestDedupSeriesSet(t *testing.T) {
}

for _, tcase := range tests {
t.Run("", func(t *testing.T) {
t.Run(tcase.name, func(t *testing.T) {
// If it is a counter then pass a function which expects a counter.
f := ""
if tcase.isCounter {
Expand Down
5 changes: 2 additions & 3 deletions pkg/dedup/pushdown_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,13 @@ func (it *pushdownSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHis
}

func (it *pushdownSeriesIterator) AtT() int64 {
t, _ := it.a.At()
t := it.a.AtT()
return t
}

// TODO(rabenhorst): Native histogram support needs to be implemented, currently float type is hardcoded.
func (it *pushdownSeriesIterator) Seek(t int64) chunkenc.ValueType {
for {
ts, _ := it.At()
ts := it.AtT()
if ts >= t {
return chunkenc.ValFloat
}
Expand Down
Loading

0 comments on commit 2f78a14

Please sign in to comment.