diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 47a9b77fef0..71aaaaf7871 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -356,7 +356,7 @@ func (h *histogramAggregator) add(s sample) { } if h.total > 0 { - if fh.DetectReset(h.previous) { + if fh.CounterResetHint != histogram.GaugeType && fh.DetectReset(h.previous) { // Counter reset, correct the value. h.counter.Add(fh) h.resets++ diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 0751b2fc6c9..9ed723bd0bd 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -800,6 +800,57 @@ func TestDownSampleNativeHistogram(t *testing.T) { }, []expectedHistogramAggregates{}, }, + { + "gauge sum aggregate no counter reset", + generateFloatHistogramSamples(30_000, 0, 11), + []expectedHistogramAggregates{ + { + count: []sample{ + {t: 299_999, v: 10}, + {t: 300_000, v: 1}, + }, + counter: []sample{ + { + t: 299_999, + fh: expectedFloatHistogramsCounter(generateFloatHistograms(10), histogram.UnknownCounterReset), + }, + { + t: 300_000, + fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(10), histogram.NotCounterReset), + }, + }, + sum: []sample{ + { + t: 299_999, + fh: expectedFloatHistogramsSum(generateFloatHistograms(10), 0), + }, + { + t: 300_000, + fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(10), histogram.GaugeType), + }, + }, + }, + }, + []expectedHistogramAggregates{ + { + count: []sample{ + {t: 300_000, v: 11}, + }, + counter: []sample{ + { + t: 300_000, + fh: expectedFloatHistogramsCounter(generateFloatHistograms(11), histogram.UnknownCounterReset), + }, + }, + sum: []sample{ + { + t: 300_000, + fh: expectedFloatHistogramsSum(generateFloatHistograms(11), 0), + }, + }, + }, + }, + }, } logger := log.NewLogfmtLogger(os.Stderr) @@ -807,44 +858,12 @@ func TestDownSampleNativeHistogram(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { dir := t.TempDir() - ser := &series{lset: labels.FromStrings("__name__", "a")} - - mint := tt.samples[0].t - maxt := mint - - chunk, app := newChunk(t, tt.samples[0].fh.CounterResetHint) - - var previous *histogram.FloatHistogram - for i, s := range tt.samples { - if i > 0 && s.fh.DetectReset(previous) { - ser.chunks = append(ser.chunks, chunks.Meta{ - MinTime: mint, - MaxTime: maxt, - Chunk: chunk, - }) - chunk, app = newChunk(t, s.fh.CounterResetHint) - mint = s.t - } - if s.t > maxt { - maxt = s.t - } - app.AppendFloatHistogram(s.t, s.fh) - previous = s.fh - } - - ser.chunks = append(ser.chunks, chunks.Meta{ - MinTime: math.MaxInt64, - MaxTime: math.MinInt64, - Chunk: chunk, - }) - - mb := newMemBlock() - mb.addSeries(ser) + mb := blockFromChunks(chunksFromHistogramSamples(t, tt.samples)) fakeMeta := &metadata.Meta{ BlockMeta: tsdb.BlockMeta{ MinTime: tt.samples[0].t, - MaxTime: maxt, + MaxTime: tt.samples[len(tt.samples)-1].t, }, } idResLevel1, err := Downsample(logger, fakeMeta, mb, dir, ResLevel1) @@ -874,7 +893,208 @@ func TestDownSampleNativeHistogram(t *testing.T) { } } -func newChunk(t *testing.T, counterResetHint histogram.CounterResetHint) (*chunkenc.FloatHistogramChunk, chunkenc.Appender) { +func TestDownsampleMixedChunkTypes(t *testing.T) { + var ( + ts int64 + samples []sample + rawChks []chunks.Meta + ) + + for _, fh := range tsdbutil.GenerateTestFloatHistograms(20) { + ts += 15_000 + samples = append(samples, sample{ + t: ts, + fh: fh, + }) + } + + rawChks = append(rawChks, chunksFromHistogramSamples(t, samples)...) + + samples = samples[:0] + for i := 0; i < 20; i++ { + ts += 15_000 + samples = append(samples, sample{ + t: ts, + v: float64(i), + }) + } + + rawChks = append(rawChks, chunkFromSamples(t, samples)) + + dir := t.TempDir() + mb := blockFromChunks(rawChks) + fakeMeta := &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + MinTime: 15_000, + MaxTime: ts, + }, + } + + logger := log.NewLogfmtLogger(os.Stderr) + + idResLevel1, err := Downsample(logger, fakeMeta, mb, dir, ResLevel1) + testutil.Ok(t, err) + + meta, chks := GetMetaAndChunks(t, dir, idResLevel1) + + assertValidChunkTime(t, chks) + + compareAggreggates(t, dir, ResLevel1, idResLevel1.String(), []expectedHistogramAggregates{ + { + count: []sample{ + {t: 299_999, v: 19}, + {t: 300_000, v: 1}, + }, + counter: []sample{ + {t: 299_999, fh: tsdbutil.GenerateTestFloatHistogram(18)}, + {t: 300_000, fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(19), histogram.NotCounterReset)}, + }, + sum: []sample{ + { + t: 299_999, + fh: expectedFloatHistogramsSum(tsdbutil.GenerateTestFloatHistograms(19), 0), + }, + { + t: 300_000, + fh: withCounterResetHint(tsdbutil.GenerateTestFloatHistogram(19), histogram.GaugeType), + }, + }, + }, + { + count: []sample{ + {t: 599_999, v: 19}, + {t: 600_000, v: 1}, + }, + counter: []sample{ + {t: 315000, v: 0}, + {t: 599_999, v: 18}, + {t: 600_000, v: 19}, + {t: 600_000, v: 19}, + }, + sum: []sample{ + {t: 599_999, v: 171}, + {t: 600_000, v: 19}, + }, + }, + }, chks) + + blk, err := tsdb.OpenBlock(logger, filepath.Join(dir, idResLevel1.String()), NewPool()) + testutil.Ok(t, err) + idResLevel2, err := Downsample(logger, meta, blk, dir, ResLevel2) + testutil.Ok(t, err) + + _, chks = GetMetaAndChunks(t, dir, idResLevel2) + + assertValidChunkTime(t, chks) + + compareAggreggates(t, dir, ResLevel2, idResLevel2.String(), []expectedHistogramAggregates{ + { + count: []sample{ + {t: 300_000, v: 20}, + }, + counter: []sample{ + { + t: 300_000, + fh: tsdbutil.GenerateTestFloatHistogram(19), + }, + }, + sum: []sample{ + { + t: 300_000, + fh: expectedFloatHistogramsSum(tsdbutil.GenerateTestFloatHistograms(20), 0), + }, + }, + }, + { + count: []sample{ + {t: 600000, v: 20}, + }, + counter: []sample{ + {t: 315000, v: 0}, + {t: 600_000, v: 19}, + {t: 600_000, v: 19}, + }, + sum: []sample{ + {t: 600_000, v: 190}, + }, + }, + }, chks) +} + +func withCounterResetHint(fh *histogram.FloatHistogram, hint histogram.CounterResetHint) *histogram.FloatHistogram { + fh.CounterResetHint = hint + return fh +} + +func blockFromChunks(chks []chunks.Meta) *memBlock { + ser := &series{ + lset: labels.FromStrings("__name__", "a"), + chunks: chks, + } + mb := newMemBlock() + mb.addSeries(ser) + return mb +} + +func chunkFromSamples(t *testing.T, samples []sample) chunks.Meta { + if len(samples) == 0 { + return chunks.Meta{} + } + + chk := chunkenc.NewXORChunk() + app, err := chk.Appender() + testutil.Ok(t, err) + + for _, s := range samples { + app.Append(s.t, s.v) + } + + return chunks.Meta{ + MinTime: samples[0].t, + MaxTime: samples[len(samples)-1].t, + Chunk: chk, + } +} + +func chunksFromHistogramSamples(t *testing.T, samples []sample) []chunks.Meta { + if len(samples) == 0 { + return nil + } + + var ( + mint = samples[0].t + maxt = samples[0].t + chks []chunks.Meta + ) + + chk, app := newHistogramChunk(t, samples[0].fh.CounterResetHint) + + var previous *histogram.FloatHistogram + for i, s := range samples { + if i > 0 && s.fh.DetectReset(previous) { + chks = append(chks, chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: chk, + }) + chk, app = newHistogramChunk(t, s.fh.CounterResetHint) + mint = s.t + } + app.AppendFloatHistogram(s.t, s.fh) + maxt = s.t + previous = s.fh + } + + chks = append(chks, chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: chk, + }) + + return chks +} + +func newHistogramChunk(t *testing.T, counterResetHint histogram.CounterResetHint) (*chunkenc.FloatHistogramChunk, chunkenc.Appender) { raw := chunkenc.NewFloatHistogramChunk() app, err := raw.Appender() if counterResetHint == histogram.GaugeType { @@ -884,6 +1104,7 @@ func newChunk(t *testing.T, counterResetHint histogram.CounterResetHint) (*chunk return raw, app } +//nolint:unparam func generateFloatHistogramSamples(scrapeInterval, startTs int64, n ...int) []sample { // Generate samples for the float histogram. ts := startTs @@ -956,7 +1177,7 @@ func compareAggreggates(t *testing.T, dir string, resLevel int64, blockID string testUtilWithOps := testutil.WithGoCmp( cmpopts.EquateApprox(0, 1e-9), - cmp.AllowUnexported(sample{}), + cmp.AllowUnexported(sample{}, histogram.FloatHistogram{}), ) for i, c := range chks {