-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Refactor block iterators, fix unconsolidated series iterator #1271
Changes from 22 commits
79561c2
22e522d
9a9b99d
8efc545
033cc5e
d889a97
0c758bf
d4df67d
f60bd83
6aa8910
7f43627
1806f72
3ecb330
2aa13a3
2f78a99
df4dcd6
5b1cd45
2d93066
36e4b69
9beda82
402e21e
ff21f56
c93e14c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,7 +58,6 @@ func read( | |
// Results is closed by execute | ||
results := make(chan executor.Query) | ||
go engine.ExecuteExpr(ctx, parser, opts, params, results) | ||
|
||
// Block slices are sorted by start time | ||
// TODO: Pooling | ||
sortedBlockList := make([]blockWithMeta, 0, initialBlockAlloc) | ||
|
@@ -143,11 +142,6 @@ func sortedBlocksToSeriesList(blockList []blockWithMeta) ([]*ts.Series, error) { | |
} | ||
|
||
firstBlock := blockList[0].block | ||
firstStepIter, err := firstBlock.StepIter() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
firstSeriesIter, err := firstBlock.SeriesIter() | ||
if err != nil { | ||
return nil, err | ||
|
@@ -171,22 +165,35 @@ func sortedBlocksToSeriesList(blockList []blockWithMeta) ([]*ts.Series, error) { | |
seriesIters[i] = seriesIter | ||
} | ||
|
||
numValues := firstStepIter.StepCount() * len(blockList) | ||
numValues := 0 | ||
for _, block := range blockList { | ||
b, err := block.block.StepIter() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
numValues += b.StepCount() | ||
} | ||
|
||
for i := 0; i < numSeries; i++ { | ||
values := ts.NewFixedStepValues(bounds.StepSize, numValues, math.NaN(), bounds.Start) | ||
valIdx := 0 | ||
for idx, iter := range seriesIters { | ||
if !iter.Next() { | ||
if err = iter.Err(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return nil, fmt.Errorf("invalid number of datapoints for series: %d, block: %d", i, idx) | ||
} | ||
|
||
blockSeries, err := iter.Current() | ||
if err != nil { | ||
if err = iter.Err(); err != nil { | ||
return nil, err | ||
} | ||
|
||
for i := 0; i < blockSeries.Len(); i++ { | ||
values.SetValueAt(valIdx, blockSeries.ValueAtStep(i)) | ||
blockSeries := iter.Current() | ||
for j := 0; j < blockSeries.Len(); j++ { | ||
values.SetValueAt(valIdx, blockSeries.ValueAtStep(j)) | ||
valIdx++ | ||
} | ||
} | ||
|
@@ -224,27 +231,18 @@ func insertSortedBlock( | |
"the block, wanted: %d, found: %d", seriesCount, blockSeriesCount) | ||
} | ||
|
||
blockStepIter, err := b.StepIter() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
blockStepCount := blockStepIter.StepCount() | ||
if stepCount != blockStepCount { | ||
return nil, fmt.Errorf("mismatch in number of steps for the"+ | ||
"block, wanted: %d, found: %d", stepCount, blockStepCount) | ||
} | ||
|
||
// Binary search to keep the start times sorted | ||
index := sort.Search(len(blockList), func(i int) bool { | ||
return blockList[i].meta.Bounds.Start.Before(blockMeta.Bounds.Start) | ||
return blockList[i].meta.Bounds.Start.After(blockMeta.Bounds.Start) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why change this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because it was wrong; it sorted in the wrong order There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, perhaps we could we add a test to ensure we don't regress this behavior? |
||
}) | ||
|
||
// Append here ensures enough size in the slice | ||
blockList = append(blockList, blockWithMeta{}) | ||
copy(blockList[index+1:], blockList[index:]) | ||
blockList[index] = blockWithMeta{ | ||
block: b, | ||
meta: blockMeta, | ||
} | ||
|
||
return blockList, nil | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,17 +36,14 @@ type columnBlock struct { | |
seriesMeta []SeriesMeta | ||
} | ||
|
||
// Unconsolidated returns the unconsolidated version for the block | ||
func (c *columnBlock) Unconsolidated() (UnconsolidatedBlock, error) { | ||
return nil, fmt.Errorf("unconsolidated view not supported for block, meta: %s", c.meta) | ||
} | ||
|
||
// Meta returns the metadata for the block | ||
func (c *columnBlock) Meta() Metadata { | ||
return c.meta | ||
} | ||
|
||
// StepIter returns a StepIterator | ||
func (c *columnBlock) StepIter() (StepIter, error) { | ||
if len(c.columns) != c.meta.Bounds.Steps() { | ||
return nil, fmt.Errorf("mismatch in block columns and meta bounds, columns: %d, bounds: %v", len(c.columns), c.meta.Bounds) | ||
|
@@ -61,7 +58,6 @@ func (c *columnBlock) StepIter() (StepIter, error) { | |
} | ||
|
||
// TODO: allow series iteration | ||
// SeriesIter returns a SeriesIterator | ||
func (c *columnBlock) SeriesIter() (SeriesIter, error) { | ||
return newColumnBlockSeriesIter(c.columns, c.meta, c.seriesMeta), nil | ||
} | ||
|
@@ -78,12 +74,10 @@ func (c *columnBlock) WithMetadata( | |
} | ||
|
||
// TODO: allow series iteration | ||
// SeriesMeta returns the metadata for each series in the block | ||
func (c *columnBlock) SeriesMeta() []SeriesMeta { | ||
return c.seriesMeta | ||
} | ||
|
||
// StepCount returns the total steps | ||
func (c *columnBlock) StepCount() int { | ||
return len(c.columns) | ||
} | ||
|
@@ -95,10 +89,12 @@ func (c *columnBlock) Close() error { | |
} | ||
|
||
type colBlockIter struct { | ||
columns []column | ||
seriesMeta []SeriesMeta | ||
meta Metadata | ||
idx int | ||
idx int | ||
timeForStep time.Time | ||
err error | ||
meta Metadata | ||
seriesMeta []SeriesMeta | ||
columns []column | ||
} | ||
|
||
func (c *colBlockIter) SeriesMeta() []SeriesMeta { | ||
|
@@ -113,30 +109,38 @@ func (c *colBlockIter) StepCount() int { | |
return len(c.columns) | ||
} | ||
|
||
// Next returns true if iterator has more values remaining | ||
func (c *colBlockIter) Next() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe write comments regarding the flow ? Eg: check for Next() if that's false, then also check for Err() ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM. Considering that these are interface methods though, might get rid of the comments here and make the iterator ones more meaningful |
||
if c.err != nil { | ||
return false | ||
} | ||
|
||
c.idx++ | ||
return c.idx < len(c.columns) | ||
} | ||
next := c.idx < len(c.columns) | ||
if !next { | ||
return false | ||
} | ||
|
||
// Current returns the current step | ||
func (c *colBlockIter) Current() (Step, error) { | ||
col := c.columns[c.idx] | ||
t, err := c.meta.Bounds.TimeForIndex(c.idx) | ||
// TODO: Test panic case | ||
if err != nil { | ||
panic(err) | ||
c.timeForStep, c.err = c.meta.Bounds.TimeForIndex(c.idx) | ||
if c.err != nil { | ||
return false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log error here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changed iterator flow here sets the error to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. |
||
} | ||
|
||
return next | ||
} | ||
|
||
func (c *colBlockIter) Err() error { | ||
return c.err | ||
} | ||
|
||
func (c *colBlockIter) Current() Step { | ||
col := c.columns[c.idx] | ||
return ColStep{ | ||
time: t, | ||
time: c.timeForStep, | ||
values: col.Values, | ||
}, nil | ||
} | ||
} | ||
|
||
// Close frees up resources | ||
func (c *colBlockIter) Close() { | ||
} | ||
func (c *colBlockIter) Close() { /*no-op*/ } | ||
|
||
// ColStep is a single column containing data from multiple series at a given time step | ||
type ColStep struct { | ||
|
@@ -210,18 +214,29 @@ type column struct { | |
|
||
// columnBlockSeriesIter is used to iterate over a column. Assumes that all columns have the same length | ||
type columnBlockSeriesIter struct { | ||
columns []column | ||
idx int | ||
blockMeta Metadata | ||
values []float64 | ||
columns []column | ||
seriesMeta []SeriesMeta | ||
} | ||
|
||
func (m *columnBlockSeriesIter) Meta() Metadata { | ||
return m.blockMeta | ||
} | ||
|
||
func newColumnBlockSeriesIter(columns []column, blockMeta Metadata, seriesMeta []SeriesMeta) SeriesIter { | ||
return &columnBlockSeriesIter{columns: columns, blockMeta: blockMeta, seriesMeta: seriesMeta, idx: -1} | ||
func newColumnBlockSeriesIter( | ||
columns []column, | ||
blockMeta Metadata, | ||
seriesMeta []SeriesMeta, | ||
) SeriesIter { | ||
return &columnBlockSeriesIter{ | ||
columns: columns, | ||
blockMeta: blockMeta, | ||
seriesMeta: seriesMeta, | ||
idx: -1, | ||
values: make([]float64, len(columns)), | ||
} | ||
} | ||
|
||
func (m *columnBlockSeriesIter) SeriesMeta() []SeriesMeta { | ||
|
@@ -237,19 +252,31 @@ func (m *columnBlockSeriesIter) SeriesCount() int { | |
return len(cols[0].Values) | ||
} | ||
|
||
func (m *columnBlockSeriesIter) Err() error { | ||
// no-op | ||
return nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comment that this is |
||
} | ||
|
||
func (m *columnBlockSeriesIter) Next() bool { | ||
m.idx++ | ||
return m.idx < m.SeriesCount() | ||
} | ||
next := m.idx < m.SeriesCount() | ||
if !next { | ||
return false | ||
} | ||
|
||
func (m *columnBlockSeriesIter) Current() (Series, error) { | ||
cols := m.columns | ||
values := make([]float64, len(cols)) | ||
for i := 0; i < len(cols); i++ { | ||
values[i] = cols[i].Values[m.idx] | ||
m.values[i] = cols[i].Values[m.idx] | ||
} | ||
|
||
return NewSeries(values, m.seriesMeta[m.idx]), nil | ||
return next | ||
} | ||
|
||
func (m *columnBlockSeriesIter) Current() Series { | ||
// TODO: pool these | ||
vals := make([]float64, len(m.values)) | ||
copy(vals, m.values) | ||
return NewSeries(vals, m.seriesMeta[m.idx]) | ||
} | ||
|
||
// TODO: Actually free resources once we do pooling | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, reverting this