-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Fix AggregateQuery limits #3112
Changes from 2 commits
476dbd4
5a99682
a0ca9db
5d2173d
c4ee6c6
522543c
ebfaa35
849c92b
bd35ca9
cad0e06
829e6b3
31f001b
172100f
2695b90
84d5a86
dd5ae2e
b18484e
ab538cd
505319c
b3fdbba
4221a99
704b172
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,6 +123,7 @@ func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { | |
|
||
func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { | ||
r.Lock() | ||
maxInsertions := r.aggregateOpts.SizeLimit - r.totalDocsCount | ||
valueInsertions := 0 | ||
for _, entry := range batch { | ||
f := entry.Field | ||
|
@@ -145,11 +146,17 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) | |
if !valuesMap.Contains(t) { | ||
// we can avoid the copy because we assume ownership of the passed ident.ID, | ||
// but still need to finalize it. | ||
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ | ||
NoCopyKey: true, | ||
NoFinalizeKey: false, | ||
}) | ||
valueInsertions++ | ||
if maxInsertions > valueInsertions { | ||
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ | ||
NoCopyKey: true, | ||
NoFinalizeKey: false, | ||
}) | ||
valueInsertions++ | ||
} else { | ||
// this value exceeds the limit, so should be released to the underling | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. underlying |
||
// pool without adding to the map. | ||
t.Finalize() | ||
} | ||
} else { | ||
// because we already have a entry for this term, we release the ident back to | ||
// the underlying pool. | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -641,15 +641,16 @@ func (b *block) aggregateWithSpan( | |
} | ||
|
||
var ( | ||
source = opts.Source | ||
size = results.Size() | ||
docsCount = results.TotalDocsCount() | ||
batch = b.opts.AggregateResultsEntryArrayPool().Get() | ||
batchSize = cap(batch) | ||
iterClosed = false // tracking whether we need to free the iterator at the end. | ||
source = opts.Source | ||
size = results.Size() | ||
resultCount = results.TotalDocsCount() | ||
batch = AggregateResultsEntries(b.opts.AggregateResultsEntryArrayPool().Get()) | ||
maxBatch = cap(batch) | ||
iterClosed = false // tracking whether we need to free the iterator at the end. | ||
exhaustive = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may not need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah seems like |
||
) | ||
if batchSize == 0 { | ||
batchSize = defaultAggregateResultsEntryBatchSize | ||
if maxBatch == 0 { | ||
maxBatch = defaultAggregateResultsEntryBatchSize | ||
} | ||
|
||
// cleanup at the end | ||
|
@@ -675,8 +676,17 @@ func (b *block) aggregateWithSpan( | |
})) | ||
} | ||
|
||
if opts.SeriesLimit < maxBatch { | ||
maxBatch = opts.SeriesLimit | ||
} | ||
|
||
if opts.DocsLimit < maxBatch { | ||
maxBatch = opts.DocsLimit | ||
} | ||
|
||
for _, reader := range readers { | ||
if opts.LimitsExceeded(size, docsCount) { | ||
if opts.LimitsExceeded(size, resultCount) { | ||
exhaustive = true | ||
break | ||
} | ||
|
||
|
@@ -685,19 +695,19 @@ func (b *block) aggregateWithSpan( | |
return false, err | ||
} | ||
iterClosed = false // only once the iterator has been successfully Reset(). | ||
|
||
for iter.Next() { | ||
if opts.LimitsExceeded(size, docsCount) { | ||
if opts.LimitsExceeded(size, resultCount) { | ||
exhaustive = true | ||
break | ||
} | ||
|
||
field, term := iter.Current() | ||
batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) | ||
if len(batch) < batchSize { | ||
if batch.Size() < maxBatch { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add a comment on this check? |
||
continue | ||
} | ||
|
||
batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) | ||
batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
@@ -714,14 +724,14 @@ func (b *block) aggregateWithSpan( | |
} | ||
|
||
// Add last batch to results if remaining. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment could be updated |
||
if len(batch) > 0 { | ||
batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) | ||
for len(batch) > 0 { | ||
batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) | ||
if err != nil { | ||
return false, err | ||
} | ||
} | ||
|
||
return opts.exhaustive(size, docsCount), nil | ||
return exhaustive || opts.exhaustive(size, resultCount), nil | ||
} | ||
|
||
func (b *block) appendFieldAndTermToBatch( | ||
|
@@ -783,6 +793,7 @@ func (b *block) appendFieldAndTermToBatch( | |
} else { | ||
batch = append(batch, entry) | ||
} | ||
|
||
return batch | ||
} | ||
|
||
|
@@ -797,12 +808,12 @@ func (b *block) pooledID(id []byte) ident.ID { | |
func (b *block) addAggregateResults( | ||
cancellable *xresource.CancellableLifetime, | ||
results AggregateResults, | ||
batch []AggregateResultsEntry, | ||
batch AggregateResultsEntries, | ||
source []byte, | ||
) ([]AggregateResultsEntry, int, int, error) { | ||
) (AggregateResultsEntries, int, int, error) { | ||
// update recently queried docs to monitor memory. | ||
if results.EnforceLimits() { | ||
if err := b.docsLimit.Inc(len(batch), source); err != nil { | ||
if err := b.docsLimit.Inc(batch.Size(), source); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will have to make a decision on how docs limit is affected in aggregate results; this is not correct |
||
return batch, 0, 0, err | ||
} | ||
} | ||
|
@@ -814,8 +825,8 @@ func (b *block) addAggregateResults( | |
return batch, 0, 0, errCancelledQuery | ||
} | ||
|
||
// try to add the docs to the xresource. | ||
size, docsCount := results.AddFields(batch) | ||
// try to add the docs to the resource. | ||
size, resultCount := results.AddFields(batch) | ||
|
||
// immediately release the checkout on the lifetime of query. | ||
cancellable.ReleaseCheckout() | ||
|
@@ -828,7 +839,7 @@ func (b *block) addAggregateResults( | |
batch = batch[:0] | ||
|
||
// return results. | ||
return batch, size, docsCount, nil | ||
return batch, size, resultCount, nil | ||
} | ||
|
||
func (b *block) AddResults( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be subtracting totalDocsCount or r.resultsMap.size()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah going to do some refactoring/renaming around this to make it clearer what each limit is; totalDocsCount is not quite correctly calculated at the moment, so will need to make a few touchups around it