-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Refactor block iterators, fix unconsolidated series iterator #1271
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1271 +/- ##
========================================
- Coverage 70.7% 70.6% -0.1%
========================================
Files 760 760
Lines 63918 63939 +21
========================================
- Hits 45194 45168 -26
- Misses 15792 15826 +34
- Partials 2932 2945 +13
Continue to review full report at Codecov.
|
5 similar comments
Codecov Report
@@ Coverage Diff @@
## master #1271 +/- ##
========================================
- Coverage 70.7% 70.6% -0.1%
========================================
Files 760 760
Lines 63918 63939 +21
========================================
- Hits 45194 45168 -26
- Misses 15792 15826 +34
- Partials 2932 2945 +13
Continue to review full report at Codecov.
|
Codecov Report
@@ Coverage Diff @@
## master #1271 +/- ##
========================================
- Coverage 70.7% 70.6% -0.1%
========================================
Files 760 760
Lines 63918 63939 +21
========================================
- Hits 45194 45168 -26
- Misses 15792 15826 +34
- Partials 2932 2945 +13
Continue to review full report at Codecov.
|
Codecov Report
@@ Coverage Diff @@
## master #1271 +/- ##
========================================
- Coverage 70.7% 70.6% -0.1%
========================================
Files 760 760
Lines 63918 63939 +21
========================================
- Hits 45194 45168 -26
- Misses 15792 15826 +34
- Partials 2932 2945 +13
Continue to review full report at Codecov.
|
Codecov Report
@@ Coverage Diff @@
## master #1271 +/- ##
========================================
- Coverage 70.7% 70.6% -0.1%
========================================
Files 760 760
Lines 63918 63939 +21
========================================
- Hits 45194 45168 -26
- Misses 15792 15826 +34
- Partials 2932 2945 +13
Continue to review full report at Codecov.
|
Codecov Report
@@ Coverage Diff @@
## master #1271 +/- ##
========================================
- Coverage 70.7% 70.6% -0.1%
========================================
Files 760 760
Lines 63918 63939 +21
========================================
- Hits 45194 45168 -26
- Misses 15792 15826 +34
- Partials 2932 2945 +13
Continue to review full report at Codecov.
|
Codecov Report
@@ Coverage Diff @@
## master #1271 +/- ##
========================================
+ Coverage 70.7% 70.7% +<.1%
========================================
Files 822 824 +2
Lines 70248 70319 +71
========================================
+ Hits 49714 49778 +64
- Misses 17305 17310 +5
- Partials 3229 3231 +2
Continue to review full report at Codecov.
|
src/query/block/column.go
Outdated
} | ||
|
||
func (m *columnBlockSeriesIter) Current() Series { | ||
return NewSeries(m.values, m.seriesMeta[m.idx]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checking--does our usage of the iterator ensure that we never use the values
slice after another call to Next
?
i.e. nothing like:
firstValues := iter.Current()
iter.Next()
nextValues := iter.Current()
Maybe worth adding a comment to the Next
method saying that it invalidates previously returned arrays, and that users should copy if they need to keep that data around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to reflect this. I think failing anything else coming up with more priority, my next task would include adding pooling to these iterators, which should make it safer.
Also this one will only be used by LegacyBlockProcessing, which shouldn't be around much longer and is not the default, so less worried about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, just going to copy the array instead as this would be a nightmare to debug if it happens :p
Good catch!
@@ -46,122 +48,125 @@ type Block interface { | |||
// UnconsolidatedBlock represents a group of unconsolidated series across a time bound | |||
type UnconsolidatedBlock interface { | |||
io.Closer | |||
// StepIter returns an UnconsolidatedStepIter | |||
// StepIter returns a step-wise block iterator, giving unconsolidated values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for updating these to make them more informative :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
it.idx++ | ||
if it.idx == 0 { | ||
// decode the series on initial Next() call | ||
if success := it.decodeSeries(); !success { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I reading correctly that this change switches us back to eager decompression? What's the reasoning behind the switch back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so the original implementation was incorrect, so had to fix it. I had a few questions with unconsolidated step iteration as a concept, since it's a bit odd, and was thinking of maybe just getting rid of it in general.
Talked offline and will fix this up to do it lazily the right way instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few places have
if next {
... do something
}
return next
Maybe consider flipping the check to avoid nesting.
@@ -115,23 +117,34 @@ func (c *colBlockIter) StepCount() int { | |||
|
|||
// Next returns true if iterator has more values remaining | |||
func (c *colBlockIter) Next() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe write comments regarding the flow ? Eg: check for Next() if that's false, then also check for Err() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. Considering that these are interface methods though, might get rid of the comments here and make the iterator ones more meaningful
src/query/block/scalar.go
Outdated
if it.idx != 0 { | ||
return Series{}, fmt.Errorf("invalid scalar index: %d, numVals: 1", it.idx) | ||
// Indicates an error with the caller, having either not called Next() or attempted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most other places, we don't check if Next() has been called before Current() ? Also, having a panic here while you also have an Err() field is confusing. Maybe just get rid of this check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, just a lot of iterators work on top of slices so if you do something like call Current
before Next
or after Next
is false, you'll get an out of bounds panic, so was to replicate that. Happy to delete it, it's not really a useful case, had it here so that it would be easier to detect if we were doing something wrong in the caller.
@@ -46,122 +48,125 @@ type Block interface { | |||
// UnconsolidatedBlock represents a group of unconsolidated series across a time bound | |||
type UnconsolidatedBlock interface { | |||
io.Closer | |||
// StepIter returns an UnconsolidatedStepIter | |||
// StepIter returns a step-wise block iterator, giving unconsolidated values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
type SeriesMeta struct { | ||
Tags models.Tags | ||
Name string | ||
} | ||
|
||
// Iterator is the base iterator | ||
// Iterator is the base iterator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write a bit about how the flow should work (as mentioned above) ?
it.idx++ | ||
next := it.idx < len(it.seriesIters) | ||
it.mu.Unlock() | ||
if next { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flip to reduce nesting ?
f5be93b
to
d889a97
Compare
@@ -237,19 +252,30 @@ func (m *columnBlockSeriesIter) SeriesCount() int { | |||
return len(cols[0].Values) | |||
} | |||
|
|||
func (m *columnBlockSeriesIter) Err() error { | |||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment that this is no-op
?
// Binary search to keep the start times sorted | ||
index := sort.Search(len(blockList), func(i int) bool { | ||
return blockList[i].meta.Bounds.Start.Before(blockMeta.Bounds.Start) | ||
return blockList[i].meta.Bounds.Start.After(blockMeta.Bounds.Start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it was wrong; it sorted in the wrong order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, perhaps we could we add a test to ensure we don't regress this behavior?
Next() bool | ||
// Err returns any error encountered during iteration. | ||
Err() error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to expand the interface? Seems like calling just iter.err
to check the error is fine.
panic(err) | ||
c.timeForStep, c.err = c.meta.Bounds.TimeForIndex(c.idx) | ||
if c.err != nil { | ||
return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log error here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changed iterator flow here sets the error to err
on the colBlockIter
; it's up to the caller to check iter.Error()
is nil after iteration's finished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
} | ||
|
||
func (it *scalarStepIter) Current() Step { | ||
t := it.stepTime | ||
return &scalarStep{ | ||
vals: []float64{it.s(t)}, | ||
time: t, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: time: it.stepTime,
) { | ||
if dp.Timestamp.Before(c.earliestLookback) { | ||
// this datapoint is too far in the past, it can be dropped. | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth logging anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No; would be a lot of annoying spam
// be the correct behaviour; investigate if it should be converted to keep | ||
// the values instead. | ||
if c.reset[i] { | ||
c.datapoints[i] = c.datapoints[i][:0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spoke offline: clean this up a bit.
src/query/ts/m3db/convert.go
Outdated
blockSize time.Duration | ||
replicas []encoding.MultiReaderIterator | ||
} | ||
|
||
type seriesBlocks []seriesBlock | ||
|
||
func (b seriesBlock) String() string { | ||
return fmt.Sprint("BlockSize:", b.blockSize.Hours(), " blockStart:", | ||
b.blockStart.Format("3:04:05PM"), " readStart:", b.readStart.Format("3:04:05PM"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment explaining why you use "3:04:05PM"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's just how go rolls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add "3:04:05PM"
as a constant perhaps to top of this file, like blockTimeFormat = ...
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump on this, can we use const blockTimeFormat = "3:04:05PM"
at the top of this file rather than inlining the format string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -83,7 +88,7 @@ func (b *encodedBlockBuilder) add( | |||
[]encoding.SeriesIterator{}, | |||
b.tagOptions, | |||
consolidation, | |||
time.Duration(0), | |||
b.lookback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Timestamp: dp.Timestamp, | ||
Value: dp.Value, | ||
}} | ||
// If at boundary, add the point as the current value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you add the point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit weird, but the point has been added to the consolidator at this point and it's already at the boundary so there's no reason to keep looking in the iterator until Next
is called again. I agree the comment here is a bit misleading/crap though, will fix it up
return storage.NewUnconsolidatedStep(it.stepTime, points) | ||
} | ||
|
||
func (it *encodedStepIterUnconsolidated) nextForStep( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add high level comment explaining why we need this function?
it.seriesPeek[i].started = true | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (it *encodedStepIterUnconsolidated) Next() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the Next()
functions are pretty similar for series and step, think it's worth moving the common parts into a common function?
i++ | ||
} | ||
|
||
assert.Equal(t, len(expected), i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert.Len()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately assert.Len gives pretty garbage output if it fails:
"[%!s(int=1) %!s(int=2) %!s(int=3)]" should have 100 item(s), but has 3
vs
Not equal: 100 (expected) != 3 (actual)
If expected
doesn't have good String methods, you'll get a bunch of unintelligible spam
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with artem, assert.Len is pretty disappointing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, good to know
src/query/README.md
Outdated
@@ -145,7 +146,8 @@ Input JSON: | |||
} | |||
} | |||
``` | |||
Full request: `curl -X POST 'localhost:7201/api/v1/debug/validate_query?start=1543431465&end=1543435045&step=14s&query=go_gc_duration_seconds%7Bquantile%3D%221%22%7D*2' -d @<input_json_file> --header "Content-Type: application/json"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, reverting this
src/query/README.md
Outdated
@@ -145,7 +146,8 @@ Input JSON: | |||
} | |||
} | |||
``` | |||
Full request: `curl -X POST 'localhost:7201/api/v1/debug/validate_query?start=1543431465&end=1543435045&step=14s&query=go_gc_duration_seconds%7Bquantile%3D%221%22%7D*2' -d @<input_json_file> --header "Content-Type: application/json"` | |||
|
|||
Full request: `curl -X POST 'http://localhost:7201/api/v1/debug/validate_query?query=sum_over_time(process_cpu_seconds_total%7Binstance%3D~%22m3db_seed%3A7203%22%7D%5B60s%5D)&start=1549035092&end=1549035177&step=1' -d @lol.json --header "Content-Type: application/json"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol
? lol
@@ -22,7 +22,6 @@ package m3db | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not in this PR, but we should organize all of these iterator files a bit better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's fair, it's kind of in a weird path right now
numValues := firstStepIter.StepCount() * len(blockList) | ||
numValues := 0 | ||
for _, block := range blockList { | ||
b, _ := block.block.StepIter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, need to check this for an error yeah? You can return err from this func so no problem to just propagate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, nice catch. Had added this path quick and dirty to test it was working correctly, then missed cleaning up and adding the error case here.
src/query/executor/transform/lazy.go
Outdated
|
||
next := s.iter.Next() | ||
if !next { | ||
s.err = s.iter.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, instead of adding this here, perhaps in Err()
we can instead just return it as well when it's asked for? Just otherwise this logic is just a little bit more difficult to follow (also if the inner iterator gets an error for some other reason later, it will properly propagate, unlike here where it can only get set during a call to next?):
func (s *stepIter) Err() error {
if s.err != nil {
return s.err
}
return s.iter.Err()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one, will do 👍
src/query/executor/transform/lazy.go
Outdated
|
||
next := s.iter.Next() | ||
if !next { | ||
s.err = s.iter.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
func TestUnconsolidatedStep(t *testing.T) { | ||
expected := [][][]float64{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👍
} | ||
|
||
name, exists := tags.Name() | ||
if !exists { | ||
return nil, errors.New("metric name does not exist") | ||
// return nil, errors.New("metric name does not exist at all ever") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, don't we want to restore this return err? What possibility would we want to allow for empty series names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops nice catch, missed cleaning this check up.
As a little bit of background, this is the codepath for the validator endpoint, used for offline testing of disparities between Prom and m3query, specifically along the query execution path. The general flow for validation is:
- See discrepancy on query results between Prom and m3q, e.g.
sum(up)
- Run the raw data query against Prom, i.e.
up
- Hit the validation endpoint with the raw data and the prometheus query result to reproduce the discrepancy
This particular codepath is responsible for parsing both the raw data, as well as the prom query result�. Whereas the raw data will always contain the series name as a tag and pass this check, the query result may not have the name
tag if any of the functions in the query mutated the return tag list. This would fail the check here on parsing the tags from result, which would be the wrong behaviour; instead it checks that the tags returned at the end of m3query execution match Prom's, with or without the name
tag (depending on if Prom included it or not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment explaining why it's fine to set to default
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah interesting. Thx for the detailed response.
|
||
// AccumulateAndMoveToNext consolidates the current values and moves the | ||
// consolidator to the next given value, purging stale values. | ||
func (c *StepLookbackAccumulator) AccumulateAndMoveToNext() []xts.Datapoints { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting 👍
@@ -123,6 +151,11 @@ func blockReplicasFromSeriesIterator( | |||
pools encoding.IteratorPools, | |||
) (seriesBlocks, error) { | |||
blocks := make(seriesBlocks, 0, bounds.Steps()) | |||
var pool encoding.MultiReaderIteratorPool | |||
if pools != nil { | |||
pool = pools.MultiReaderIterator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 yay pooling! Nw.
type encodedStepIter struct { | ||
mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: Removing the mutex, is there no concurrent access now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there wasn't really any before either, concurrent access on an iterator is a bit weird but was going to be necessary for a proposed lookback fix for split blocks�.
@@ -0,0 +1,155 @@ | |||
// Copyright (c) 2018 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2019
Name: []byte(name), | ||
Value: []byte(val), | ||
}) | ||
if name == metricName { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing!
// Update read starts for existing blocks. | ||
for i, bl := range blocks { | ||
blocks[i].readStart = iterStart | ||
fillSize := bl.blockStart.Add(bl.blockSize).Sub(iterStart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull some of this common stuff out into a function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see what you meant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
Changes query's iterators to match M3 Iterators; i.e. not erroring on
Current
methods but having a dedicatedAlso moves all processing onto the
Next
function, allowingCurrent
to be a much simpler method in all iterators, and consolidating complexity inNext
Also fixes an error with unconsolidated series encoded block iterators