Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Add ChunksIterator method to Series interface. #665

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s.
- `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`.
- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.
- [CHANGE] `Series` interface allows return chunk iterator that allows iterating over encoded chunks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong paragraph?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failed rebase


## 0.9.1

Expand Down
228 changes: 226 additions & 2 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tsdb

import (
"fmt"
"math"
"sort"
"strings"
"unicode/utf8"
Expand Down Expand Up @@ -54,6 +55,9 @@ type Series interface {

// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator

// ChunkIterator returns a new iterator for the non-overlapping chunks of the series.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
ChunkIterator() ChunkIterator
}

// querier aggregates querying results from time blocks within
Expand Down Expand Up @@ -876,6 +880,10 @@ func (s *chunkSeries) Iterator() SeriesIterator {
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
}

func (s *chunkSeries) ChunkIterator() ChunkIterator {
return &chunkIterator{chunks: s.chunks}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newChunkSeriesIterator accepts mint and maxt. Should we do the same here? @brian-brazil @krasi-georgiev ? I think we should.

However not sure what mint maxt means here in detail. Should we be strict and allow mint and maxt in terms of samples inside chunks as well?

By the looks of this: https://github.com/prometheus/tsdb/blob/78e9e360581153d73b1c69d31fd6e2d89e060a82/querier.go#L796 we just check if they overlap with required min or maxt here.

Copy link
Contributor Author

@bwplotka bwplotka Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, we pass to newChunkSeriesIterator with mint maxt so I am pretty sure we need to be strict here as well in ChunkIterator. E.g when we query block [0, 20)with querier restricted to[10, 40]we need to return chunk[10,19]even if the block has[0, 19]on thisChunkIterator` level, I assume.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newChunkSeriesIterator accepts mint and maxt. Should we do the same here? @brian-brazil @krasi-georgiev ? I think we should.

I think it is fine to start with with the current interface if it covers the streaming use case. Don't see a problem to refactor when/if needed.

}

// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
Expand Down Expand Up @@ -904,7 +912,15 @@ func (s *chainedSeries) Iterator() SeriesIterator {
return newChainedSeriesIterator(s.series...)
}

// chainedSeriesIterator implements a series iterater over a list
func (s *chainedSeries) ChunkIterator() ChunkIterator {
ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))}
for _, s := range s.series {
ch.chain = append(ch.chain, s.ChunkIterator())
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
}
return ch
}

// chainedSeriesIterator implements a series iterated over a list
// of time-sorted, non-overlapping iterators.
type chainedSeriesIterator struct {
series []Series // series in time order
Expand Down Expand Up @@ -975,7 +991,11 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator {
return newVerticalMergeSeriesIterator(s.series...)
}

// verticalMergeSeriesIterator implements a series iterater over a list
func (s *verticalChainedSeries) ChunkIterator() ChunkIterator {
Copy link
Contributor

@krasi-georgiev krasi-georgiev Aug 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful to also add a test for this.
It will help visualize what output is expected with a given input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test that covers this?

return newVerticalMergeChunkIterator(s.series...)
}

// verticalMergeSeriesIterator implements a series iterator over a list
// of time-sorted, time-overlapping iterators.
type verticalMergeSeriesIterator struct {
a, b SeriesIterator
Expand Down Expand Up @@ -1055,6 +1075,141 @@ func (it *verticalMergeSeriesIterator) Err() error {
return it.b.Err()
}

type noSeekSeriesIterator struct {
chunkenc.Iterator
err error
}

func (it *noSeekSeriesIterator) Seek(t int64) bool {
it.err = errors.New("not implemented: Seek method invoked for noSeekSeriesIterator")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather prefer having an inefficient implementation as @brian-brazil said instead of an error here.

return false
}

func (it *noSeekSeriesIterator) Err() error {
if it.err != nil {
return it.err
}
return it.Iterator.Err()
}

// verticalMergeChunkIterator implements a ChunkIterator over a list
// of time-sorted, time-overlapping chunk iterators for the same labels (same series).
// Any overlap in chunks will be merged using verticalMergeSeriesIterator.
type verticalMergeChunkIterator struct {
a, b ChunkIterator
aok, bok, initialized bool

curMeta chunks.Meta
err error

aReuseIter, bReuseIter chunkenc.Iterator
}

func newVerticalMergeChunkIterator(s ...Series) ChunkIterator {
if len(s) == 1 {
return s[0].ChunkIterator()
} else if len(s) == 2 {
return &verticalMergeChunkIterator{
a: s[0].ChunkIterator(),
b: s[1].ChunkIterator(),
}
}
return &verticalMergeChunkIterator{
a: s[0].ChunkIterator(),
b: newVerticalMergeChunkIterator(s[1:]...),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this really hard to understand. I know it is used in the verticalMergeSeriesIterator as well, but will be really happy if we simplify this at some point.
Not a blocker for this PR thought. I just thought I would mention it 😈

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recursive approach is sometimes hard to follow indeed. Let's postpone cleaning this though, as we do this everywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did run a debug session and the final struct is really bizarre, sort of deeply nested.

But yeah a conversation for another PR.

}
}

func (it *verticalMergeChunkIterator) Next() bool {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
if !it.initialized {
it.aok = it.a.Next()
it.bok = it.b.Next()
it.initialized = true
}

if !it.aok && !it.bok {
return false
}

if !it.aok {
it.curMeta = it.b.At()
it.bok = it.b.Next()
return true
}
if !it.bok {
it.curMeta = it.a.At()
it.aok = it.a.Next()
return true
}

aCurMeta := it.a.At()
bCurMeta := it.b.At()

if aCurMeta.MaxTime < bCurMeta.MinTime {
it.curMeta = aCurMeta
it.aok = it.a.Next()
return true
}

if bCurMeta.MaxTime < aCurMeta.MinTime {
it.curMeta = bCurMeta
it.bok = it.b.Next()
return true
}

chk := chunkenc.NewXORChunk()
app, err := chk.Appender()
if err != nil {
it.err = err
return false
}
seriesIter := &verticalMergeSeriesIterator{
a: &noSeekSeriesIterator{Iterator: aCurMeta.Chunk.Iterator(it.aReuseIter)},
b: &noSeekSeriesIterator{Iterator: bCurMeta.Chunk.Iterator(it.bReuseIter)},
}

mint := int64(math.MaxInt64)
maxt := int64(0)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

// TODO: This can end up being up to 240 samples per chunk, so we need to have a case to split to two.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
for seriesIter.Next() {
t, v := seriesIter.At()
app.Append(t, v)

maxt = t
if mint == math.MaxInt64 {
mint = t
}
}
if err := seriesIter.Err(); err != nil {
it.err = err
return false
}

it.curMeta = chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
}
it.aok = it.a.Next()
it.bok = it.b.Next()
return true
}

func (it *verticalMergeChunkIterator) At() chunks.Meta {
return it.curMeta
}

func (it *verticalMergeChunkIterator) Err() error {
if it.err != nil {
return it.err
}
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
}

// chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct {
Expand Down Expand Up @@ -1210,3 +1365,72 @@ type errSeriesSet struct {
func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }

// ChunkIterator iterates over the chunk of a time series.
type ChunkIterator interface {
// At returns the meta.
At() chunks.Meta
// Next advances the iterator by one.
Next() bool
// Err returns optional error if Next is false.
Err() error
}

type errChunkIterator struct {
err error
}

func (s errChunkIterator) Next() bool { return false }
func (s errChunkIterator) At() chunks.Meta { return chunks.Meta{} }
func (s errChunkIterator) Err() error { return s.err }

type chunkIterator struct {
chunks []chunks.Meta // series in time order
i int
}

func (c *chunkIterator) Next() bool {
if c.i >= len(c.chunks) {
return false
}
c.i++
return true
}

func (c *chunkIterator) At() chunks.Meta {
return c.chunks[c.i-1]
}

func (c *chunkIterator) Err() error { return nil }

// chainedChunkIterator implements flat iteration for chunks iterated over a list
// of time-sorted, non-overlapping iterators for each series.
type chainedChunkIterator struct {
chain []ChunkIterator // chunk iterators for each series in time order
i int
err error
}

func (c *chainedChunkIterator) Next() bool {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
if c.Err() != nil {
return false
}

for c.i < len(c.chain) {
if c.chain[c.i].Next() {
return true
}
if err := c.chain[c.i].Err(); err != nil {
c.err = err
return false
}
c.i++
}
return false
}

func (c *chainedChunkIterator) At() chunks.Meta {
return c.chain[c.i].At()
}

func (c *chainedChunkIterator) Err() error { return c.err }
57 changes: 53 additions & 4 deletions querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) {
return r, it.Err()
}

func expandChunkIterator(it ChunkIterator) (chks []chunks.Meta) {
for it.Next() {
chks = append(chks, it.At())
}
return chks
}

type seriesSamples struct {
lset map[string]string
chunks [][]sample
Expand Down Expand Up @@ -661,8 +668,9 @@ type itSeries struct {
si SeriesIterator
}

func (s itSeries) Iterator() SeriesIterator { return s.si }
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func (s itSeries) Iterator() SeriesIterator { return s.si }
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func (s itSeries) ChunkIterator() ChunkIterator { return nil }

func TestSeriesIterator(t *testing.T) {
itcases := []struct {
Expand Down Expand Up @@ -1044,6 +1052,46 @@ func TestSeriesIterator(t *testing.T) {
})
}

func TestChunkIterator(t *testing.T) {
it := &chunkIterator{}
testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it))
testutil.Equals(t, false, it.Next())

chks := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}),
}
it = &chunkIterator{chunks: chks}
testutil.Equals(t, chks, expandChunkIterator(it))
testutil.Equals(t, false, it.Next())
}

func TestChainedChunkIterator(t *testing.T) {
it := &chainedChunkIterator{}
testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it))
testutil.Equals(t, false, it.Next())

chks1 := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}),
}
chks2 := []chunks.Meta(nil)
chks3 := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 1}, sample{4, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{5, 1}, sample{5, 2}}),
}

it = &chainedChunkIterator{chain: []ChunkIterator{
&chunkIterator{chunks: chks1},
&chunkIterator{chunks: chks2},
&chunkIterator{chunks: chks3},
}}
testutil.Equals(t, append(chks1, chks3...), expandChunkIterator(it))
testutil.Equals(t, false, it.Next())
}

// Regression for: https://github.com/prometheus/tsdb/pull/97
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
chkMetas := []chunks.Meta{
Expand Down Expand Up @@ -1439,8 +1487,9 @@ func newSeries(l map[string]string, s []tsdbutil.Sample) Series {
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
func (m *mockSeries) ChunkIterator() ChunkIterator { return nil }

type listSeriesIterator struct {
list []tsdbutil.Sample
Expand Down