Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Add ChunksIterator method to Series interface.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
tomwilkie authored and bwplotka committed Jul 29, 2019
1 parent d5b3f07 commit 64eeed9
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s.
- `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`.
- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.
- [CHANGE] `Series` interface allows return chunk iterator that allows iterating over encoded chunks.

## 0.9.1

Expand Down
82 changes: 82 additions & 0 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Series interface {

// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator

// ChunkIterator returns a new iterator of the chunks of the series.
ChunkIterator() ChunkIterator
}

// querier aggregates querying results from time blocks within
Expand Down Expand Up @@ -876,6 +879,10 @@ func (s *chunkSeries) Iterator() SeriesIterator {
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
}

func (s *chunkSeries) ChunkIterator() ChunkIterator {
return &chunkIterator{chunks: s.chunks}
}

// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
Expand Down Expand Up @@ -904,6 +911,14 @@ func (s *chainedSeries) Iterator() SeriesIterator {
return newChainedSeriesIterator(s.series...)
}

func (s *chainedSeries) ChunkIterator() ChunkIterator {
ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))}
for _, s := range s.series {
ch.chain = append(ch.chain, s.ChunkIterator())
}
return ch
}

// chainedSeriesIterator implements a series iterater over a list
// of time-sorted, non-overlapping iterators.
type chainedSeriesIterator struct {
Expand Down Expand Up @@ -975,6 +990,14 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator {
return newVerticalMergeSeriesIterator(s.series...)
}

func (s *verticalChainedSeries) ChunkIterator() ChunkIterator {
ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))}
for _, s := range s.series {
ch.chain = append(ch.chain, s.ChunkIterator())
}
return ch
}

// verticalMergeSeriesIterator implements a series iterater over a list
// of time-sorted, time-overlapping iterators.
type verticalMergeSeriesIterator struct {
Expand Down Expand Up @@ -1210,3 +1233,62 @@ type errSeriesSet struct {
func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }

// ChunkIterator iterates over the chunk of a time series.
type ChunkIterator interface {
// At returns the meta.
At() chunks.Meta
// Next advances the iterator by one.
Next() bool
// Err returns optional error if Next is false.
Err() error
}

type chunkIterator struct {
chunks []chunks.Meta
i int
}

func (c *chunkIterator) Next() bool {
if c.i >= len(c.chunks) {
return false
}
c.i++
return true
}

func (c *chunkIterator) At() chunks.Meta {
return c.chunks[c.i-1]
}

func (c *chunkIterator) Err() error { return nil }

type chainedChunkIterator struct {
chain []ChunkIterator
i int
err error
}

func (c *chainedChunkIterator) Next() bool {
if c.Err() != nil {
return false
}

for c.i < len(c.chain) {
if c.chain[c.i].Next() {
return true
}
if err := c.chain[c.i].Err(); err != nil {
c.err = err
return false
}
c.i++
}
return false
}

func (c *chainedChunkIterator) At() chunks.Meta {
return c.chain[c.i].At()
}

func (c *chainedChunkIterator) Err() error { return c.err }
57 changes: 53 additions & 4 deletions querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) {
return r, it.Err()
}

func expandChunkIterator(it ChunkIterator) (chks []chunks.Meta) {
for it.Next() {
chks = append(chks, it.At())
}
return chks
}

type seriesSamples struct {
lset map[string]string
chunks [][]sample
Expand Down Expand Up @@ -661,8 +668,9 @@ type itSeries struct {
si SeriesIterator
}

func (s itSeries) Iterator() SeriesIterator { return s.si }
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func (s itSeries) Iterator() SeriesIterator { return s.si }
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func (s itSeries) ChunkIterator() ChunkIterator { return nil }

func TestSeriesIterator(t *testing.T) {
itcases := []struct {
Expand Down Expand Up @@ -1044,6 +1052,46 @@ func TestSeriesIterator(t *testing.T) {
})
}

func TestChunkIterator(t *testing.T) {
it := &chunkIterator{}
testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it))
testutil.Equals(t, false, it.Next())

chks := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}),
}
it = &chunkIterator{chunks: chks}
testutil.Equals(t, chks, expandChunkIterator(it))
testutil.Equals(t, false, it.Next())
}

func TestChainedChunkIterator(t *testing.T) {
it := &chainedChunkIterator{}
testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it))
testutil.Equals(t, false, it.Next())

chks1 := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}),
}
chks2 := []chunks.Meta(nil)
chks3 := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 1}, sample{4, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{5, 1}, sample{5, 2}}),
}

it = &chainedChunkIterator{chain: []ChunkIterator{
&chunkIterator{chunks: chks1},
&chunkIterator{chunks: chks2},
&chunkIterator{chunks: chks3},
}}
testutil.Equals(t, append(chks1, chks3...), expandChunkIterator(it))
testutil.Equals(t, false, it.Next())
}

// Regression for: https://github.com/prometheus/tsdb/pull/97
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
chkMetas := []chunks.Meta{
Expand Down Expand Up @@ -1439,8 +1487,9 @@ func newSeries(l map[string]string, s []tsdbutil.Sample) Series {
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
func (m *mockSeries) ChunkIterator() ChunkIterator { return nil }

type listSeriesIterator struct {
list []tsdbutil.Sample
Expand Down

0 comments on commit 64eeed9

Please sign in to comment.