From 64eeed92547aafcc118bb3c290007e09ab6f47c6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 11 Jul 2019 18:09:02 +0100 Subject: [PATCH] Add ChunksIterator method to Series interface. Signed-off-by: Bartek Plotka --- CHANGELOG.md | 1 + querier.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++++ querier_test.go | 57 +++++++++++++++++++++++++++++++--- 3 files changed, 136 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12364b09..0989f303 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s. - `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`. - [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse. + - [CHANGE] `Series` interface allows return chunk iterator that allows iterating over encoded chunks. ## 0.9.1 diff --git a/querier.go b/querier.go index fbd9493f..ea65a134 100644 --- a/querier.go +++ b/querier.go @@ -54,6 +54,9 @@ type Series interface { // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator + + // ChunkIterator returns a new iterator of the chunks of the series. + ChunkIterator() ChunkIterator } // querier aggregates querying results from time blocks within @@ -876,6 +879,10 @@ func (s *chunkSeries) Iterator() SeriesIterator { return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt) } +func (s *chunkSeries) ChunkIterator() ChunkIterator { + return &chunkIterator{chunks: s.chunks} +} + // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. @@ -904,6 +911,14 @@ func (s *chainedSeries) Iterator() SeriesIterator { return newChainedSeriesIterator(s.series...) } +func (s *chainedSeries) ChunkIterator() ChunkIterator { + ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))} + for _, s := range s.series { + ch.chain = append(ch.chain, s.ChunkIterator()) + } + return ch +} + // chainedSeriesIterator implements a series iterater over a list // of time-sorted, non-overlapping iterators. type chainedSeriesIterator struct { @@ -975,6 +990,14 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator { return newVerticalMergeSeriesIterator(s.series...) } +func (s *verticalChainedSeries) ChunkIterator() ChunkIterator { + ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))} + for _, s := range s.series { + ch.chain = append(ch.chain, s.ChunkIterator()) + } + return ch +} + // verticalMergeSeriesIterator implements a series iterater over a list // of time-sorted, time-overlapping iterators. type verticalMergeSeriesIterator struct { @@ -1210,3 +1233,62 @@ type errSeriesSet struct { func (s errSeriesSet) Next() bool { return false } func (s errSeriesSet) At() Series { return nil } func (s errSeriesSet) Err() error { return s.err } + +// ChunkIterator iterates over the chunk of a time series. +type ChunkIterator interface { + // At returns the meta. + At() chunks.Meta + // Next advances the iterator by one. + Next() bool + // Err returns optional error if Next is false. + Err() error +} + +type chunkIterator struct { + chunks []chunks.Meta + i int +} + +func (c *chunkIterator) Next() bool { + if c.i >= len(c.chunks) { + return false + } + c.i++ + return true +} + +func (c *chunkIterator) At() chunks.Meta { + return c.chunks[c.i-1] +} + +func (c *chunkIterator) Err() error { return nil } + +type chainedChunkIterator struct { + chain []ChunkIterator + i int + err error +} + +func (c *chainedChunkIterator) Next() bool { + if c.Err() != nil { + return false + } + + for c.i < len(c.chain) { + if c.chain[c.i].Next() { + return true + } + if err := c.chain[c.i].Err(); err != nil { + c.err = err + return false + } + c.i++ + } + return false +} + +func (c *chainedChunkIterator) At() chunks.Meta { + return c.chain[c.i].At() +} + +func (c *chainedChunkIterator) Err() error { return c.err } diff --git a/querier_test.go b/querier_test.go index 2be48fcd..15e4f27e 100644 --- a/querier_test.go +++ b/querier_test.go @@ -188,6 +188,13 @@ func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) { return r, it.Err() } +func expandChunkIterator(it ChunkIterator) (chks []chunks.Meta) { + for it.Next() { + chks = append(chks, it.At()) + } + return chks +} + type seriesSamples struct { lset map[string]string chunks [][]sample @@ -661,8 +668,9 @@ type itSeries struct { si SeriesIterator } -func (s itSeries) Iterator() SeriesIterator { return s.si } -func (s itSeries) Labels() labels.Labels { return labels.Labels{} } +func (s itSeries) Iterator() SeriesIterator { return s.si } +func (s itSeries) Labels() labels.Labels { return labels.Labels{} } +func (s itSeries) ChunkIterator() ChunkIterator { return nil } func TestSeriesIterator(t *testing.T) { itcases := []struct { @@ -1044,6 +1052,46 @@ func TestSeriesIterator(t *testing.T) { }) } +func TestChunkIterator(t *testing.T) { + it := &chunkIterator{} + testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) + + chks := []chunks.Meta{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}), + } + it = &chunkIterator{chunks: chks} + testutil.Equals(t, chks, expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) +} + +func TestChainedChunkIterator(t *testing.T) { + it := &chainedChunkIterator{} + testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) + + chks1 := []chunks.Meta{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}), + } + chks2 := []chunks.Meta(nil) + chks3 := []chunks.Meta{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 1}, sample{4, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{5, 1}, sample{5, 2}}), + } + + it = &chainedChunkIterator{chain: []ChunkIterator{ + &chunkIterator{chunks: chks1}, + &chunkIterator{chunks: chks2}, + &chunkIterator{chunks: chks3}, + }} + testutil.Equals(t, append(chks1, chks3...), expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) +} + // Regression for: https://github.com/prometheus/tsdb/pull/97 func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { chkMetas := []chunks.Meta{ @@ -1439,8 +1487,9 @@ func newSeries(l map[string]string, s []tsdbutil.Sample) Series { iterator: func() SeriesIterator { return newListSeriesIterator(s) }, } } -func (m *mockSeries) Labels() labels.Labels { return m.labels() } -func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } +func (m *mockSeries) Labels() labels.Labels { return m.labels() } +func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } +func (m *mockSeries) ChunkIterator() ChunkIterator { return nil } type listSeriesIterator struct { list []tsdbutil.Sample