-
Notifications
You must be signed in to change notification settings - Fork 179
Add ChunksIterator method to Series interface. #665
base: master
Are you sure you want to change the base?
Changes from 1 commit
64eeed9
4ed00e1
d097d3f
c407499
af8fb41
6f6dd39
78e9e36
f09fb89
220c3dd
b84c439
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,9 @@ type Series interface { | |
|
||
// Iterator returns a new iterator of the data of the series. | ||
Iterator() SeriesIterator | ||
|
||
// ChunkIterator returns a new iterator of the chunks of the series. | ||
ChunkIterator() ChunkIterator | ||
} | ||
|
||
// querier aggregates querying results from time blocks within | ||
|
@@ -876,6 +879,10 @@ func (s *chunkSeries) Iterator() SeriesIterator { | |
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt) | ||
} | ||
|
||
func (s *chunkSeries) ChunkIterator() ChunkIterator { | ||
return &chunkIterator{chunks: s.chunks} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
However not sure what mint maxt means here in detail. Should we be strict and allow mint and maxt in terms of samples inside chunks as well? By the looks of this: https://github.com/prometheus/tsdb/blob/78e9e360581153d73b1c69d31fd6e2d89e060a82/querier.go#L796 we just check if they overlap with required min or maxt here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then, we pass to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it is fine to start with with the current interface if it covers the streaming use case. Don't see a problem to refactor when/if needed. |
||
} | ||
|
||
// SeriesIterator iterates over the data of a time series. | ||
type SeriesIterator interface { | ||
// Seek advances the iterator forward to the given timestamp. | ||
|
@@ -904,6 +911,14 @@ func (s *chainedSeries) Iterator() SeriesIterator { | |
return newChainedSeriesIterator(s.series...) | ||
} | ||
|
||
func (s *chainedSeries) ChunkIterator() ChunkIterator { | ||
ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))} | ||
for _, s := range s.series { | ||
ch.chain = append(ch.chain, s.ChunkIterator()) | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return ch | ||
} | ||
|
||
// chainedSeriesIterator implements a series iterater over a list | ||
// of time-sorted, non-overlapping iterators. | ||
type chainedSeriesIterator struct { | ||
|
@@ -975,6 +990,14 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator { | |
return newVerticalMergeSeriesIterator(s.series...) | ||
} | ||
|
||
func (s *verticalChainedSeries) ChunkIterator() ChunkIterator { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be useful to also add a test for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a test that covers this? |
||
ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))} | ||
for _, s := range s.series { | ||
ch.chain = append(ch.chain, s.ChunkIterator()) | ||
codesome marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return ch | ||
} | ||
|
||
// verticalMergeSeriesIterator implements a series iterater over a list | ||
// of time-sorted, time-overlapping iterators. | ||
type verticalMergeSeriesIterator struct { | ||
|
@@ -1210,3 +1233,62 @@ type errSeriesSet struct { | |
func (s errSeriesSet) Next() bool { return false } | ||
func (s errSeriesSet) At() Series { return nil } | ||
func (s errSeriesSet) Err() error { return s.err } | ||
|
||
// ChunkIterator iterates over the chunk of a time series. | ||
type ChunkIterator interface { | ||
// At returns the meta. | ||
At() chunks.Meta | ||
// Next advances the iterator by one. | ||
Next() bool | ||
// Err returns optional error if Next is false. | ||
Err() error | ||
} | ||
|
||
type chunkIterator struct { | ||
chunks []chunks.Meta | ||
i int | ||
} | ||
|
||
func (c *chunkIterator) Next() bool { | ||
if c.i >= len(c.chunks) { | ||
return false | ||
} | ||
c.i++ | ||
return true | ||
} | ||
|
||
func (c *chunkIterator) At() chunks.Meta { | ||
return c.chunks[c.i-1] | ||
} | ||
|
||
func (c *chunkIterator) Err() error { return nil } | ||
|
||
type chainedChunkIterator struct { | ||
chain []ChunkIterator | ||
i int | ||
err error | ||
} | ||
|
||
func (c *chainedChunkIterator) Next() bool { | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if c.Err() != nil { | ||
return false | ||
} | ||
|
||
for c.i < len(c.chain) { | ||
if c.chain[c.i].Next() { | ||
return true | ||
} | ||
if err := c.chain[c.i].Err(); err != nil { | ||
c.err = err | ||
return false | ||
} | ||
c.i++ | ||
} | ||
return false | ||
} | ||
|
||
func (c *chainedChunkIterator) At() chunks.Meta { | ||
return c.chain[c.i].At() | ||
} | ||
|
||
func (c *chainedChunkIterator) Err() error { return c.err } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong paragraph?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failed rebase