Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Add ChunksIterator method to Series interface. #665

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions chunkenc/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ type Appender interface {
// Iterator iterates over the data of a time series.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
type Iterator interface {
// Seek advances the iterator forward to the sample with the timestamp t or first value after t.
// If the current iterator points to the sample with timestamp after t already,
// Seek should not advance the iterator.
// If the current iterator points to the sample with timestamp after t already, Seek should not advance the iterator.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
// Seek returns false if there is no such sample with the timestamp equal or larger than t.
// Iterator can be exhausted when the Seek returns false.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (int64, float64)
Expand Down
2 changes: 1 addition & 1 deletion chunkenc/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func testChunk(t *testing.T, c Chunk) {
testutil.Ok(t, it3.Err())
testutil.Equals(t, all[mid:], res3)

testutil.Equals(t, false, it3.Seek(all[len(all)-1].t + 1))
testutil.Equals(t, false, it3.Seek(all[len(all)-1].t+1))
}

func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
Expand Down
65 changes: 41 additions & 24 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ type Series interface {
ChunkIterator() ChunkIterator
}

// ChunkIterator iterates over the chunk of a time series.
type ChunkIterator interface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to be near Series interface.

// Seek advances the iterator forward to the given timestamp.
// It advances to the chunk with min time at t or first chunk with min time after t.
Seek(t int64) bool
// At returns the meta.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"current value". This makes it sounds like it's dropping memes :)

At() chunks.Meta
// Next advances the iterator by one.
Next() bool
// Err returns optional error if Next is false.
Err() error
}

// querier aggregates querying results from time blocks within
// a single partition.
type querier struct {
Expand Down Expand Up @@ -942,7 +955,7 @@ func (it *chainedSeriesIterator) Next() bool {
if it.cur.Next() {
return true
}
if err := it.cur.Err(); err != nil {
if it.cur.Err() != nil {
return false
}
if it.i == len(it.series)-1 {
Expand Down Expand Up @@ -1007,6 +1020,10 @@ func newVerticalMergeSeriesIterator(s ...Series) chunkenc.Iterator {
}

func (it *verticalMergeSeriesIterator) Seek(t int64) bool {
if it.initialized && it.curT >= t {
return true
}

it.aok, it.bok = it.a.Seek(t), it.b.Seek(t)
it.initialized = true
return it.Next()
Expand Down Expand Up @@ -1157,7 +1174,7 @@ func mergeOverlappingChunks(a, b chunks.Meta, aReuseIter, bReuseIter chunkenc.It
}

mint := int64(math.MaxInt64)
maxt := int64(0)
maxt := int64(math.MinInt64)

// TODO: This can end up being up to 240 samples per chunk, so we need to have a case to split to two.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
for seriesIter.Next() {
Expand Down Expand Up @@ -1243,38 +1260,51 @@ func (it *chunkSeriesIterator) resetCurIterator() {
it.cur = it.bufDelIter
}

func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
if t > it.maxt {
func (it *chunkSeriesIterator) Seek(t int64) bool {
if it.Err() != nil || t > it.maxt || it.i > len(it.chunks)-1 {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
// Exhaust iterator.
it.i = len(it.chunks)
return false
}

// Seek to the first valid value after t.
if t < it.mint {
t = it.mint
}

currI := it.i
for ; it.chunks[it.i].MaxTime < t; it.i++ {
if it.i == len(it.chunks)-1 {
// Exhaust iterator.
it.i = len(it.chunks)
return false
}
}

it.resetCurIterator()
if currI != it.i {
it.resetCurIterator()
}

for it.cur.Next() {
t0, _ := it.cur.At()
if t0 >= t {
return true
tc, _ := it.cur.At()
for t > tc {
if !it.cur.Next() {
// Exhaust iterator.
it.i = len(it.chunks)
return false
}
tc, _ = it.cur.At()
}
return false
return true
}

func (it *chunkSeriesIterator) At() (t int64, v float64) {
return it.cur.At()
}

func (it *chunkSeriesIterator) Next() bool {
if it.Err() != nil || it.i > len(it.chunks)-1 {
return false
}

if it.cur.Next() {
t, _ := it.cur.At()

Expand Down Expand Up @@ -1369,19 +1399,6 @@ func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }

// ChunkIterator iterates over the chunk of a time series.
type ChunkIterator interface {
// Seek advances the iterator forward to the given timestamp.
// It advances to the chunk with min time at t or first chunk with min time after t.
Seek(t int64) bool
// At returns the meta.
At() chunks.Meta
// Next advances the iterator by one.
Next() bool
// Err returns optional error if Next is false.
Err() error
}

type chunkIterator struct {
chunks []chunks.Meta // series in time order
i int
Expand Down
Loading