Skip to content

Commit

Permalink
Added test for mixed types and gauge reset fix
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Rabenhorst <[email protected]>
  • Loading branch information
rabenhorst committed Jul 31, 2023
1 parent 304e3d2 commit 979c139
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (h *histogramAggregator) add(s sample) {
}

if h.total > 0 {
if fh.DetectReset(h.previous) {
if fh.CounterResetHint != histogram.GaugeType && fh.DetectReset(h.previous) {
// Counter reset, correct the value.
h.counter.Add(fh)
h.resets++
Expand Down
293 changes: 257 additions & 36 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,51 +800,70 @@ func TestDownSampleNativeHistogram(t *testing.T) {
},
[]expectedHistogramAggregates{},
},
{
"gauge sum aggregate no counter reset",
generateFloatHistogramSamples(30_000, 0, 11),
[]expectedHistogramAggregates{
{
count: []sample{
{t: 299_999, v: 10},
{t: 300_000, v: 1},
},
counter: []sample{
{
t: 299_999,
fh: expectedFloatHistogramsCounter(generateFloatHistograms(10), histogram.UnknownCounterReset),
},
{
t: 300_000,
fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(10), histogram.NotCounterReset),
},
},
sum: []sample{
{
t: 299_999,
fh: expectedFloatHistogramsSum(generateFloatHistograms(10), 0),
},
{
t: 300_000,
fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(10), histogram.GaugeType),
},
},
},
},
[]expectedHistogramAggregates{
{
count: []sample{
{t: 300_000, v: 11},
},
counter: []sample{
{
t: 300_000,
fh: expectedFloatHistogramsCounter(generateFloatHistograms(11), histogram.UnknownCounterReset),
},
},
sum: []sample{
{
t: 300_000,
fh: expectedFloatHistogramsSum(generateFloatHistograms(11), 0),
},
},
},
},
},
}

logger := log.NewLogfmtLogger(os.Stderr)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dir := t.TempDir()
ser := &series{lset: labels.FromStrings("__name__", "a")}

mint := tt.samples[0].t
maxt := mint

chunk, app := newChunk(t, tt.samples[0].fh.CounterResetHint)

var previous *histogram.FloatHistogram
for i, s := range tt.samples {
if i > 0 && s.fh.DetectReset(previous) {
ser.chunks = append(ser.chunks, chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chunk,
})
chunk, app = newChunk(t, s.fh.CounterResetHint)
mint = s.t
}
if s.t > maxt {
maxt = s.t
}
app.AppendFloatHistogram(s.t, s.fh)
previous = s.fh
}

ser.chunks = append(ser.chunks, chunks.Meta{
MinTime: math.MaxInt64,
MaxTime: math.MinInt64,
Chunk: chunk,
})

mb := newMemBlock()
mb.addSeries(ser)
mb := blockFromChunks(chunksFromHistogramSamples(t, tt.samples))

fakeMeta := &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: tt.samples[0].t,
MaxTime: maxt,
MaxTime: tt.samples[len(tt.samples)-1].t,
},
}
idResLevel1, err := Downsample(logger, fakeMeta, mb, dir, ResLevel1)
Expand Down Expand Up @@ -874,7 +893,208 @@ func TestDownSampleNativeHistogram(t *testing.T) {
}
}

func newChunk(t *testing.T, counterResetHint histogram.CounterResetHint) (*chunkenc.FloatHistogramChunk, chunkenc.Appender) {
func TestDownsampleMixedChunkTypes(t *testing.T) {
var (
ts int64
samples []sample
rawChks []chunks.Meta
)

for _, fh := range tsdbutil.GenerateTestFloatHistograms(20) {
ts += 15_000
samples = append(samples, sample{
t: ts,
fh: fh,
})
}

rawChks = append(rawChks, chunksFromHistogramSamples(t, samples)...)

samples = samples[:0]
for i := 0; i < 20; i++ {
ts += 15_000
samples = append(samples, sample{
t: ts,
v: float64(i),
})
}

rawChks = append(rawChks, chunkFromSamples(t, samples))

dir := t.TempDir()
mb := blockFromChunks(rawChks)
fakeMeta := &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 15_000,
MaxTime: ts,
},
}

logger := log.NewLogfmtLogger(os.Stderr)

idResLevel1, err := Downsample(logger, fakeMeta, mb, dir, ResLevel1)
testutil.Ok(t, err)

meta, chks := GetMetaAndChunks(t, dir, idResLevel1)

assertValidChunkTime(t, chks)

compareAggreggates(t, dir, ResLevel1, idResLevel1.String(), []expectedHistogramAggregates{
{
count: []sample{
{t: 299_999, v: 19},
{t: 300_000, v: 1},
},
counter: []sample{
{t: 299_999, fh: tsdbutil.GenerateTestFloatHistogram(18)},
{t: 300_000, fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(19), histogram.NotCounterReset)},
},
sum: []sample{
{
t: 299_999,
fh: expectedFloatHistogramsSum(tsdbutil.GenerateTestFloatHistograms(19), 0),
},
{
t: 300_000,
fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(19), histogram.GaugeType),
},
},
},
{
count: []sample{
{t: 599_999, v: 19},
{t: 600_000, v: 1},
},
counter: []sample{
{t: 315000, v: 0},
{t: 599_999, v: 18},
{t: 600_000, v: 19},
{t: 600_000, v: 19},
},
sum: []sample{
{t: 599_999, v: 171},
{t: 600_000, v: 19},
},
},
}, chks)

blk, err := tsdb.OpenBlock(logger, filepath.Join(dir, idResLevel1.String()), NewPool())
testutil.Ok(t, err)
idResLevel2, err := Downsample(logger, meta, blk, dir, ResLevel2)
testutil.Ok(t, err)

_, chks = GetMetaAndChunks(t, dir, idResLevel2)

assertValidChunkTime(t, chks)

compareAggreggates(t, dir, ResLevel2, idResLevel2.String(), []expectedHistogramAggregates{
{
count: []sample{
{t: 300_000, v: 20},
},
counter: []sample{
{
t: 300_000,
fh: tsdbutil.GenerateTestFloatHistogram(19),
},
},
sum: []sample{
{
t: 300_000,
fh: expectedFloatHistogramsSum(tsdbutil.GenerateTestFloatHistograms(20), 0),
},
},
},
{
count: []sample{
{t: 600000, v: 20},
},
counter: []sample{
{t: 315000, v: 0},
{t: 600_000, v: 19},
{t: 600_000, v: 19},
},
sum: []sample{
{t: 600_000, v: 190},
},
},
}, chks)
}

func withCounterResetHint(fh *histogram.FloatHistogram, hint histogram.CounterResetHint) *histogram.FloatHistogram {
fh.CounterResetHint = hint
return fh
}

func blockFromChunks(chks []chunks.Meta) *memBlock {
ser := &series{
lset: labels.FromStrings("__name__", "a"),
chunks: chks,
}
mb := newMemBlock()
mb.addSeries(ser)
return mb
}

func chunkFromSamples(t *testing.T, samples []sample) chunks.Meta {
if len(samples) == 0 {
return chunks.Meta{}
}

chk := chunkenc.NewXORChunk()
app, err := chk.Appender()
testutil.Ok(t, err)

for _, s := range samples {
app.Append(s.t, s.v)
}

return chunks.Meta{
MinTime: samples[0].t,
MaxTime: samples[len(samples)-1].t,
Chunk: chk,
}
}

func chunksFromHistogramSamples(t *testing.T, samples []sample) []chunks.Meta {
if len(samples) == 0 {
return nil
}

var (
mint = samples[0].t
maxt = samples[0].t
chks []chunks.Meta
)

chk, app := newHistogramChunk(t, samples[0].fh.CounterResetHint)

var previous *histogram.FloatHistogram
for i, s := range samples {
if i > 0 && s.fh.DetectReset(previous) {
chks = append(chks, chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
})
chk, app = newHistogramChunk(t, s.fh.CounterResetHint)
mint = s.t
}
app.AppendFloatHistogram(s.t, s.fh)
maxt = s.t
previous = s.fh
}

chks = append(chks, chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
})

return chks
}

func newHistogramChunk(t *testing.T, counterResetHint histogram.CounterResetHint) (*chunkenc.FloatHistogramChunk, chunkenc.Appender) {
raw := chunkenc.NewFloatHistogramChunk()
app, err := raw.Appender()
if counterResetHint == histogram.GaugeType {
Expand All @@ -884,6 +1104,7 @@ func newChunk(t *testing.T, counterResetHint histogram.CounterResetHint) (*chunk
return raw, app
}

//nolint:unparam
func generateFloatHistogramSamples(scrapeInterval, startTs int64, n ...int) []sample {
// Generate samples for the float histogram.
ts := startTs
Expand Down Expand Up @@ -956,7 +1177,7 @@ func compareAggreggates(t *testing.T, dir string, resLevel int64, blockID string

testUtilWithOps := testutil.WithGoCmp(
cmpopts.EquateApprox(0, 1e-9),
cmp.AllowUnexported(sample{}),
cmp.AllowUnexported(sample{}, histogram.FloatHistogram{}),
)

for i, c := range chks {
Expand Down

0 comments on commit 979c139

Please sign in to comment.