Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Fix AggregateQuery limits #3112

Merged
merged 22 commits into from
Jan 23, 2021
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
476dbd4
[dbnode] AggregateQuery limit fix
arnikola Jan 22, 2021
5a99682
Cleanup
arnikola Jan 22, 2021
a0ca9db
[integration test] Add label query limits integration test
wesleyk Jan 22, 2021
5d2173d
Add paren
wesleyk Jan 22, 2021
c4ee6c6
Fixed exhaustive case, remove dead code
arnikola Jan 22, 2021
522543c
Merge branch 'arnikola/fix-limits' of github.com:m3db/m3 into arnikol…
arnikola Jan 22, 2021
ebfaa35
Aggregate results changes
arnikola Jan 22, 2021
849c92b
Add proper require exhaustive conversion + integration test for agg q…
wesleyk Jan 22, 2021
bd35ca9
Merge branch 'arnikola/fix-limits' of github.com:m3db/m3 into arnikol…
wesleyk Jan 22, 2021
cad0e06
Merge branch 'master' into arnikola/fix-limits
wesleyk Jan 22, 2021
829e6b3
Avoid flakiness with high limits
wesleyk Jan 22, 2021
31f001b
Limit on docs or inserts
arnikola Jan 22, 2021
172100f
Fixup integration test
wesleyk Jan 22, 2021
2695b90
Merge branch 'arnikola/fix-limits' of github.com:m3db/m3 into arnikol…
wesleyk Jan 22, 2021
84d5a86
Add more precise assertions to label query limits integration test
wesleyk Jan 22, 2021
dd5ae2e
Finish test fixes and refactor
arnikola Jan 22, 2021
b18484e
Response + lint
arnikola Jan 22, 2021
ab538cd
Improve IT comments
wesleyk Jan 22, 2021
505319c
Merge branch 'master' into arnikola/fix-limits
wesleyk Jan 22, 2021
b3fdbba
Response + lint
arnikola Jan 22, 2021
4221a99
Fix integrations
arnikola Jan 22, 2021
704b172
Merge branch 'arnikola/fix-limits' of github.com:m3db/m3 into arnikol…
arnikola Jan 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Response + lint
  • Loading branch information
arnikola committed Jan 22, 2021
commit b3fdbba2e211e661a2294db16a07a97ef4ce30b2
25 changes: 10 additions & 15 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
@@ -110,13 +110,13 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
r.Lock()
defer r.Unlock()

maxDocs := int(math.MaxInt64)
remainingDocs := int(math.MaxInt64)
if r.aggregateOpts.DocsLimit != 0 {
maxDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount
remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount
}

// NB: already hit doc limit.
if maxDocs <= 0 {
if remainingDocs <= 0 {
for idx := 0; idx < len(batch); idx++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so since we hit the limit, we essentially have to clean up this entire batch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly

batch[idx].Field.Finalize()
for _, term := range batch[idx].Terms {
@@ -128,22 +128,17 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
}

// NB: cannot insert more than max docs, so that acts as the upper bound here.
maxInserts := maxDocs
remainingInserts := remainingDocs
if r.aggregateOpts.SizeLimit != 0 {
if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < maxInserts {
maxInserts = remaining
if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < remainingInserts {
remainingInserts = remaining
}
}

limitTripped := false
docs := 0
numInserts := 0
for _, entry := range batch {
if docs >= maxDocs || numInserts >= maxInserts {
limitTripped = true
}

if limitTripped {
if docs >= remainingDocs || numInserts >= remainingInserts {
entry.Field.Finalize()
for _, term := range entry.Terms {
term.Finalize()
@@ -158,7 +153,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
f := entry.Field
aggValues, ok := r.resultsMap.Get(f)
if !ok {
if maxInserts > numInserts {
if remainingInserts > numInserts {
numInserts++
aggValues = r.valuesPool.Get()
// we can avoid the copy because we assume ownership of the passed ident.ID,
@@ -180,12 +175,12 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)

valuesMap := aggValues.Map()
for _, t := range entry.Terms {
if maxDocs > docs {
if remainingDocs > docs {
docs++
if !valuesMap.Contains(t) {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
if maxInserts > numInserts {
if remainingInserts > numInserts {
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
4 changes: 2 additions & 2 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
@@ -645,10 +645,10 @@ func (b *block) aggregateWithSpan(
size = results.Size()
resultCount = results.TotalDocsCount()
batch = b.opts.AggregateResultsEntryArrayPool().Get()
numAdded = 0
currBatchSize = 0
maxBatch = cap(batch)
iterClosed = false // tracking whether we need to free the iterator at the end.
currBatchSize int
numAdded int
)
if maxBatch == 0 {
maxBatch = defaultAggregateResultsEntryBatchSize