From 7feb2310d24c22dafcd09e47dfc4fce5a77db44e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 11 Jul 2019 18:09:02 +0100 Subject: [PATCH] Add ChunksIterator method to Series interface. Signed-off-by: Bartek Plotka --- querier.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++++ querier_test.go | 57 +++++++++++++++++++++++++++++--- 2 files changed, 140 insertions(+), 4 deletions(-) diff --git a/querier.go b/querier.go index fbd9493f..813c5a0d 100644 --- a/querier.go +++ b/querier.go @@ -54,6 +54,9 @@ type Series interface { // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator + + // ChunkIterator returns a new iterator of the chunks of the series. + ChunkIterator() ChunkIterator } // querier aggregates querying results from time blocks within @@ -876,6 +879,10 @@ func (s *chunkSeries) Iterator() SeriesIterator { return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt) } +func (s *chunkSeries) ChunkIterator() ChunkIterator { + return &chunkIterator{chunks: s.chunks} +} + // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. @@ -904,6 +911,14 @@ func (s *chainedSeries) Iterator() SeriesIterator { return newChainedSeriesIterator(s.series...) } +func (s *chainedSeries) ChunkIterator() ChunkIterator { + ch := &chainedChunkIterator{} + for _, s := range s.series { + ch.chain = append(ch.chain, s.ChunkIterator()) + } + return ch +} + // chainedSeriesIterator implements a series iterater over a list // of time-sorted, non-overlapping iterators. type chainedSeriesIterator struct { @@ -975,6 +990,14 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator { return newVerticalMergeSeriesIterator(s.series...) } +func (s *verticalChainedSeries) ChunkIterator() ChunkIterator { + ch := &chainedChunkIterator{} + for _, s := range s.series { + ch.chain = append(ch.chain, s.ChunkIterator()) + } + return ch +} + // verticalMergeSeriesIterator implements a series iterater over a list // of time-sorted, time-overlapping iterators. type verticalMergeSeriesIterator struct { @@ -1210,3 +1233,67 @@ type errSeriesSet struct { func (s errSeriesSet) Next() bool { return false } func (s errSeriesSet) At() Series { return nil } func (s errSeriesSet) Err() error { return s.err } + +// ChunkIterator iterates over the chunk of a time series. +type ChunkIterator interface { + // At returns the meta. + At() chunks.Meta + // Next advances the iterator by one. + Next() bool + // Err returns optional error if Next is false. + Err() error +} + +type chunkIterator struct { + chunks []chunks.Meta + i int +} + +func (c *chunkIterator) Next() bool { + if c.i >= len(c.chunks) { + return false + } + c.i++ + return true +} + +func (c *chunkIterator) At() chunks.Meta { + return c.chunks[c.i-1] +} + +func (c *chunkIterator) Err() error { return nil } + +type chainedChunkIterator struct { + chain []ChunkIterator + i int + err error +} + +func (c *chainedChunkIterator) Next() bool { + if c.i == 0 { + if len(c.chain) == 0 { + return false + } + c.i++ + } + + for { + if c.chain[c.i-1].Next() { + return true + } + if err := c.chain[c.i-1].Err(); err != nil { + c.err = err + return false + } + if c.i >= len(c.chain) { + return false + } + c.i++ + } +} + +func (c *chainedChunkIterator) At() chunks.Meta { + return c.chain[c.i-1].At() +} + +func (c *chainedChunkIterator) Err() error { return c.err } diff --git a/querier_test.go b/querier_test.go index 2be48fcd..15e4f27e 100644 --- a/querier_test.go +++ b/querier_test.go @@ -188,6 +188,13 @@ func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) { return r, it.Err() } +func expandChunkIterator(it ChunkIterator) (chks []chunks.Meta) { + for it.Next() { + chks = append(chks, it.At()) + } + return chks +} + type seriesSamples struct { lset map[string]string chunks [][]sample @@ -661,8 +668,9 @@ type itSeries struct { si SeriesIterator } -func (s itSeries) Iterator() SeriesIterator { return s.si } -func (s itSeries) Labels() labels.Labels { return labels.Labels{} } +func (s itSeries) Iterator() SeriesIterator { return s.si } +func (s itSeries) Labels() labels.Labels { return labels.Labels{} } +func (s itSeries) ChunkIterator() ChunkIterator { return nil } func TestSeriesIterator(t *testing.T) { itcases := []struct { @@ -1044,6 +1052,46 @@ func TestSeriesIterator(t *testing.T) { }) } +func TestChunkIterator(t *testing.T) { + it := &chunkIterator{} + testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) + + chks := []chunks.Meta{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}), + } + it = &chunkIterator{chunks: chks} + testutil.Equals(t, chks, expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) +} + +func TestChainedChunkIterator(t *testing.T) { + it := &chainedChunkIterator{} + testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) + + chks1 := []chunks.Meta{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}), + } + chks2 := []chunks.Meta(nil) + chks3 := []chunks.Meta{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 1}, sample{4, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{5, 1}, sample{5, 2}}), + } + + it = &chainedChunkIterator{chain: []ChunkIterator{ + &chunkIterator{chunks: chks1}, + &chunkIterator{chunks: chks2}, + &chunkIterator{chunks: chks3}, + }} + testutil.Equals(t, append(chks1, chks3...), expandChunkIterator(it)) + testutil.Equals(t, false, it.Next()) +} + // Regression for: https://github.com/prometheus/tsdb/pull/97 func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { chkMetas := []chunks.Meta{ @@ -1439,8 +1487,9 @@ func newSeries(l map[string]string, s []tsdbutil.Sample) Series { iterator: func() SeriesIterator { return newListSeriesIterator(s) }, } } -func (m *mockSeries) Labels() labels.Labels { return m.labels() } -func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } +func (m *mockSeries) Labels() labels.Labels { return m.labels() } +func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } +func (m *mockSeries) ChunkIterator() ChunkIterator { return nil } type listSeriesIterator struct { list []tsdbutil.Sample