Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Refactor block iterators, fix unconsolidated series iterator #1271

Merged
merged 23 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
79561c2
Rebase on master
arnikola Jan 4, 2019
22e522d
Refactor iterators WIP
arnikola Dec 28, 2018
9a9b99d
Finish refactor, fix output values from unconsolidated step iterator
arnikola Dec 29, 2018
8efc545
PR response flipping nesting and comments
arnikola Jan 11, 2019
033cc5e
PR response
arnikola Jan 12, 2019
d889a97
Adding files.
arnikola Jan 12, 2019
0c758bf
Merge branch 'master' into arnikola/refactor-block-iterators
arnikola Jan 14, 2019
d4df67d
Fixes to some boundary issues causing incorrect results.
arnikola Jan 17, 2019
f60bd83
wip
arnikola Jan 24, 2019
6aa8910
Merge branch 'master' into arnikola/refactor-block-iterators
arnikola Feb 1, 2019
7f43627
Fix to temporal functions, cleanup.
arnikola Feb 1, 2019
1806f72
Merge branch 'master' into arnikola/refactor-block-iterators
arnikola Feb 1, 2019
3ecb330
Fixing merge issues
arnikola Feb 1, 2019
2aa13a3
PR responses
arnikola Feb 4, 2019
2f78a99
Refactoring encoded step iterators
arnikola Feb 4, 2019
df4dcd6
Refactored encoded step iterator.
arnikola Feb 5, 2019
5b1cd45
Merge branch 'master' into arnikola/refactor-block-iterators
arnikola Feb 5, 2019
2d93066
Merge branch 'master' into arnikola/refactor-block-iterators
arnikola Feb 5, 2019
36e4b69
Merge branch 'master' into arnikola/refactor-block-iterators
arnikola Feb 6, 2019
9beda82
Refactoring step iterators
arnikola Feb 6, 2019
402e21e
Finish refactoring encoded step iterators
arnikola Feb 7, 2019
ff21f56
Metalint fix
arnikola Feb 7, 2019
c93e14c
PR responses
arnikola Feb 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/query/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Users can validate m3query's Prometheus results for a given query against input
Query: `go_gc_duration_seconds{quantile="1"} * 2`

Input JSON:

```
{
"input": {
Expand Down Expand Up @@ -145,6 +146,7 @@ Input JSON:
}
}
```

Full request: `curl -X POST 'localhost:7201/api/v1/debug/validate_query?start=1543431465&end=1543435045&step=14s&query=go_gc_duration_seconds%7Bquantile%3D%221%22%7D*2' -d @<input_json_file> --header "Content-Type: application/json"`

[doc-img]: https://godoc.org/github.com/m3db/m3/src/query?status.svg
Expand Down
5 changes: 4 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// ServeHTTPWithEngine returns query results from the storage
func (h *PromReadHandler) ServeHTTPWithEngine(w http.ResponseWriter, r *http.Request, engine *executor.Engine) ([]*ts.Series, models.RequestParams, *RespError) {
func (h *PromReadHandler) ServeHTTPWithEngine(
w http.ResponseWriter,
r *http.Request, engine *executor.Engine,
) ([]*ts.Series, models.RequestParams, *RespError) {
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)

Expand Down
44 changes: 21 additions & 23 deletions src/query/api/v1/handler/prometheus/native/read_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func read(
// Results is closed by execute
results := make(chan executor.Query)
go engine.ExecuteExpr(ctx, parser, opts, params, results)

// Block slices are sorted by start time
// TODO: Pooling
sortedBlockList := make([]blockWithMeta, 0, initialBlockAlloc)
Expand Down Expand Up @@ -143,11 +142,6 @@ func sortedBlocksToSeriesList(blockList []blockWithMeta) ([]*ts.Series, error) {
}

firstBlock := blockList[0].block
firstStepIter, err := firstBlock.StepIter()
if err != nil {
return nil, err
}

firstSeriesIter, err := firstBlock.SeriesIter()
if err != nil {
return nil, err
Expand All @@ -171,22 +165,35 @@ func sortedBlocksToSeriesList(blockList []blockWithMeta) ([]*ts.Series, error) {
seriesIters[i] = seriesIter
}

numValues := firstStepIter.StepCount() * len(blockList)
numValues := 0
for _, block := range blockList {
b, err := block.block.StepIter()
if err != nil {
return nil, err
}

numValues += b.StepCount()
}

for i := 0; i < numSeries; i++ {
values := ts.NewFixedStepValues(bounds.StepSize, numValues, math.NaN(), bounds.Start)
valIdx := 0
for idx, iter := range seriesIters {
if !iter.Next() {
if err = iter.Err(); err != nil {
return nil, err
}

return nil, fmt.Errorf("invalid number of datapoints for series: %d, block: %d", i, idx)
}

blockSeries, err := iter.Current()
if err != nil {
if err = iter.Err(); err != nil {
return nil, err
}

for i := 0; i < blockSeries.Len(); i++ {
values.SetValueAt(valIdx, blockSeries.ValueAtStep(i))
blockSeries := iter.Current()
for j := 0; j < blockSeries.Len(); j++ {
values.SetValueAt(valIdx, blockSeries.ValueAtStep(j))
valIdx++
}
}
Expand Down Expand Up @@ -224,27 +231,18 @@ func insertSortedBlock(
"the block, wanted: %d, found: %d", seriesCount, blockSeriesCount)
}

blockStepIter, err := b.StepIter()
if err != nil {
return nil, err
}

blockStepCount := blockStepIter.StepCount()
if stepCount != blockStepCount {
return nil, fmt.Errorf("mismatch in number of steps for the"+
"block, wanted: %d, found: %d", stepCount, blockStepCount)
}

// Binary search to keep the start times sorted
index := sort.Search(len(blockList), func(i int) bool {
return blockList[i].meta.Bounds.Start.Before(blockMeta.Bounds.Start)
return blockList[i].meta.Bounds.Start.After(blockMeta.Bounds.Start)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was wrong; it sorted in the wrong order

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, perhaps we could we add a test to ensure we don't regress this behavior?

})

// Append here ensures enough size in the slice
blockList = append(blockList, blockWithMeta{})
copy(blockList[index+1:], blockList[index:])
blockList[index] = blockWithMeta{
block: b,
meta: blockMeta,
}

return blockList, nil
}
38 changes: 32 additions & 6 deletions src/query/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 61 additions & 34 deletions src/query/block/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,14 @@ type columnBlock struct {
seriesMeta []SeriesMeta
}

// Unconsolidated returns the unconsolidated version for the block
func (c *columnBlock) Unconsolidated() (UnconsolidatedBlock, error) {
return nil, fmt.Errorf("unconsolidated view not supported for block, meta: %s", c.meta)
}

// Meta returns the metadata for the block
func (c *columnBlock) Meta() Metadata {
return c.meta
}

// StepIter returns a StepIterator
func (c *columnBlock) StepIter() (StepIter, error) {
if len(c.columns) != c.meta.Bounds.Steps() {
return nil, fmt.Errorf("mismatch in block columns and meta bounds, columns: %d, bounds: %v", len(c.columns), c.meta.Bounds)
Expand All @@ -61,7 +58,6 @@ func (c *columnBlock) StepIter() (StepIter, error) {
}

// TODO: allow series iteration
// SeriesIter returns a SeriesIterator
func (c *columnBlock) SeriesIter() (SeriesIter, error) {
return newColumnBlockSeriesIter(c.columns, c.meta, c.seriesMeta), nil
}
Expand All @@ -78,12 +74,10 @@ func (c *columnBlock) WithMetadata(
}

// TODO: allow series iteration
// SeriesMeta returns the metadata for each series in the block
func (c *columnBlock) SeriesMeta() []SeriesMeta {
return c.seriesMeta
}

// StepCount returns the total steps
func (c *columnBlock) StepCount() int {
return len(c.columns)
}
Expand All @@ -95,10 +89,12 @@ func (c *columnBlock) Close() error {
}

type colBlockIter struct {
columns []column
seriesMeta []SeriesMeta
meta Metadata
idx int
idx int
timeForStep time.Time
err error
meta Metadata
seriesMeta []SeriesMeta
columns []column
}

func (c *colBlockIter) SeriesMeta() []SeriesMeta {
Expand All @@ -113,30 +109,38 @@ func (c *colBlockIter) StepCount() int {
return len(c.columns)
}

// Next returns true if iterator has more values remaining
func (c *colBlockIter) Next() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe write comments regarding the flow ? Eg: check for Next() if that's false, then also check for Err() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Considering that these are interface methods though, might get rid of the comments here and make the iterator ones more meaningful

if c.err != nil {
return false
}

c.idx++
return c.idx < len(c.columns)
}
next := c.idx < len(c.columns)
if !next {
return false
}

// Current returns the current step
func (c *colBlockIter) Current() (Step, error) {
col := c.columns[c.idx]
t, err := c.meta.Bounds.TimeForIndex(c.idx)
// TODO: Test panic case
if err != nil {
panic(err)
c.timeForStep, c.err = c.meta.Bounds.TimeForIndex(c.idx)
if c.err != nil {
return false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log error here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changed iterator flow here sets the error to err on the colBlockIter; it's up to the caller to check iter.Error() is nil after iteration's finished

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

}

return next
}

func (c *colBlockIter) Err() error {
return c.err
}

func (c *colBlockIter) Current() Step {
col := c.columns[c.idx]
return ColStep{
time: t,
time: c.timeForStep,
values: col.Values,
}, nil
}
}

// Close frees up resources
func (c *colBlockIter) Close() {
}
func (c *colBlockIter) Close() { /*no-op*/ }

// ColStep is a single column containing data from multiple series at a given time step
type ColStep struct {
Expand Down Expand Up @@ -210,18 +214,29 @@ type column struct {

// columnBlockSeriesIter is used to iterate over a column. Assumes that all columns have the same length
type columnBlockSeriesIter struct {
columns []column
idx int
blockMeta Metadata
values []float64
columns []column
seriesMeta []SeriesMeta
}

func (m *columnBlockSeriesIter) Meta() Metadata {
return m.blockMeta
}

func newColumnBlockSeriesIter(columns []column, blockMeta Metadata, seriesMeta []SeriesMeta) SeriesIter {
return &columnBlockSeriesIter{columns: columns, blockMeta: blockMeta, seriesMeta: seriesMeta, idx: -1}
func newColumnBlockSeriesIter(
columns []column,
blockMeta Metadata,
seriesMeta []SeriesMeta,
) SeriesIter {
return &columnBlockSeriesIter{
columns: columns,
blockMeta: blockMeta,
seriesMeta: seriesMeta,
idx: -1,
values: make([]float64, len(columns)),
}
}

func (m *columnBlockSeriesIter) SeriesMeta() []SeriesMeta {
Expand All @@ -237,19 +252,31 @@ func (m *columnBlockSeriesIter) SeriesCount() int {
return len(cols[0].Values)
}

func (m *columnBlockSeriesIter) Err() error {
// no-op
return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment that this is no-op?

}

func (m *columnBlockSeriesIter) Next() bool {
m.idx++
return m.idx < m.SeriesCount()
}
next := m.idx < m.SeriesCount()
if !next {
return false
}

func (m *columnBlockSeriesIter) Current() (Series, error) {
cols := m.columns
values := make([]float64, len(cols))
for i := 0; i < len(cols); i++ {
values[i] = cols[i].Values[m.idx]
m.values[i] = cols[i].Values[m.idx]
}

return NewSeries(values, m.seriesMeta[m.idx]), nil
return next
}

func (m *columnBlockSeriesIter) Current() Series {
// TODO: pool these
vals := make([]float64, len(m.values))
copy(vals, m.values)
return NewSeries(vals, m.seriesMeta[m.idx])
}

// TODO: Actually free resources once we do pooling
Expand Down
Loading