From 476dbd4645cd0eaf82fba1d21b32907bf6078799 Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 00:38:21 -0500 Subject: [PATCH 01/16] [dbnode] AggregateQuery limit fix --- src/dbnode/storage/index/aggregate_results.go | 41 +++++++----- .../aggregate_results_entry_arraypool_gen.go | 14 ++++ src/dbnode/storage/index/block.go | 65 ++++++++++++------- 3 files changed, 81 insertions(+), 39 deletions(-) diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index b2bd4ea908..9c08d395bf 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -42,8 +42,8 @@ type aggregatedResults struct { nsID ident.ID aggregateOpts AggregateResultsOptions - resultsMap *AggregateResultsMap - totalDocsCount int + resultsMap *AggregateResultsMap + totalResultCount int idPool ident.Pool bytesPool pool.CheckedBytesPool @@ -100,7 +100,7 @@ func (r *aggregatedResults) Reset( // reset all keys in the map next r.resultsMap.Reset() - r.totalDocsCount = 0 + r.totalResultCount = 0 // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. @@ -111,10 +111,11 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) r.Lock() err := r.addDocumentsBatchWithLock(batch) size := r.resultsMap.Len() - docsCount := r.totalDocsCount + len(batch) - r.totalDocsCount = docsCount + resultCount := r.totalResultCount + len(batch) + r.totalResultCount = resultCount + fmt.Println("Size", size, "resultCount", resultCount, "BatchLen", len(batch)) r.Unlock() - return size, docsCount, err + return size, resultCount, err } func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { @@ -123,6 +124,7 @@ func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { r.Lock() + maxInsertions := r.aggregateOpts.SizeLimit - r.totalResultCount valueInsertions := 0 for _, entry := range batch { f := entry.Field @@ -145,11 +147,18 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) if !valuesMap.Contains(t) { // we can avoid the copy because we assume ownership of the passed ident.ID, // but still need to finalize it. - valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) - valueInsertions++ + if maxInsertions > valueInsertions { + valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + valueInsertions++ + fmt.Println("max", maxInsertions, "value", valueInsertions) + } else { + // this value exceeds the limit, so should be released to the underling + // pool without adding to the map. + t.Finalize() + } } else { // because we already have a entry for this term, we release the ident back to // the underlying pool. @@ -158,10 +167,10 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } } size := r.resultsMap.Len() - docsCount := r.totalDocsCount + valueInsertions - r.totalDocsCount = docsCount + resultCount := r.totalResultCount + valueInsertions + r.totalResultCount = resultCount r.Unlock() - return size, docsCount + return size, resultCount } func (r *aggregatedResults) addDocumentsBatchWithLock( @@ -319,9 +328,9 @@ func (r *aggregatedResults) Size() int { return l } -func (r *aggregatedResults) TotalDocsCount() int { +func (r *aggregatedResults) totalResultCount() int { r.RLock() - count := r.totalDocsCount + count := r.totalResultCount r.RUnlock() return count } diff --git a/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go b/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go index 66ac84180c..1e8f98e59a 100644 --- a/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go +++ b/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go @@ -48,6 +48,20 @@ import ( // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// AggregateResultsEntries is a slice of AggregateResultsEntry. +type AggregateResultsEntries []AggregateResultsEntry + +// Size is the element size of the aggregated result entries. +func (e AggregateResultsEntries) Size() int { + // NB: add 1 to the entries length for each entry's field. + length := len(e) + for _, entry := range e { + length += len(entry.Terms) + } + + return length +} + // AggregateResultsEntryArrayPool provides a pool for aggregateResultsEntry slices. type AggregateResultsEntryArrayPool interface { // Init initializes the array pool, it needs to be called diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index cbb641e6e5..11ade352cd 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -641,15 +641,16 @@ func (b *block) aggregateWithSpan( } var ( - source = opts.Source - size = results.Size() - docsCount = results.TotalDocsCount() - batch = b.opts.AggregateResultsEntryArrayPool().Get() - batchSize = cap(batch) - iterClosed = false // tracking whether we need to free the iterator at the end. + source = opts.Source + size = results.Size() + resultCount = results.TotalDocsCount() + batch = AggregateResultsEntries(b.opts.AggregateResultsEntryArrayPool().Get()) + maxBatch = cap(batch) + iterClosed = false // tracking whether we need to free the iterator at the end. + exhaustive = false ) - if batchSize == 0 { - batchSize = defaultAggregateResultsEntryBatchSize + if maxBatch == 0 { + maxBatch = defaultAggregateResultsEntryBatchSize } // cleanup at the end @@ -675,8 +676,20 @@ func (b *block) aggregateWithSpan( })) } - for _, reader := range readers { - if opts.LimitsExceeded(size, docsCount) { + if opts.SeriesLimit < maxBatch { + maxBatch = opts.SeriesLimit + } + + if opts.DocsLimit < maxBatch { + maxBatch = opts.DocsLimit + } + + for idx, reader := range readers { + if size > 0 || resultCount > 0 { + fmt.Println("reader", idx, "Size", size, "resultCount", resultCount) + } + if opts.LimitsExceeded(size, resultCount) { + exhaustive = true break } @@ -685,19 +698,22 @@ func (b *block) aggregateWithSpan( return false, err } iterClosed = false // only once the iterator has been successfully Reset(). - for iter.Next() { - if opts.LimitsExceeded(size, docsCount) { + if opts.LimitsExceeded(size, resultCount) { + exhaustive = true break } field, term := iter.Current() + before := len(batch) batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) - if len(batch) < batchSize { + fmt.Println("Adding term", string(field), string(term), "before", before, "len", len(batch), "iterateTerms", iterateTerms, "Size batch", batch.Size()) + if batch.Size() < maxBatch { continue } - batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) + batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) + fmt.Println("Added aggregate results", size, "resultCount", resultCount) if err != nil { return false, err } @@ -714,14 +730,16 @@ func (b *block) aggregateWithSpan( } // Add last batch to results if remaining. - if len(batch) > 0 { - batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) + for len(batch) > 0 { + fmt.Println("Adding aggregate results at end", size, "resultCount", resultCount, "Batch size", len(batch)) + batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) + fmt.Println("Added aggregate results at end", size, "resultCount", resultCount, "Batch size", len(batch)) if err != nil { return false, err } } - return opts.exhaustive(size, docsCount), nil + return exhaustive || opts.exhaustive(size, resultCount), nil } func (b *block) appendFieldAndTermToBatch( @@ -783,6 +801,7 @@ func (b *block) appendFieldAndTermToBatch( } else { batch = append(batch, entry) } + return batch } @@ -797,12 +816,12 @@ func (b *block) pooledID(id []byte) ident.ID { func (b *block) addAggregateResults( cancellable *xresource.CancellableLifetime, results AggregateResults, - batch []AggregateResultsEntry, + batch AggregateResultsEntries, source []byte, -) ([]AggregateResultsEntry, int, int, error) { +) (AggregateResultsEntries, int, int, error) { // update recently queried docs to monitor memory. if results.EnforceLimits() { - if err := b.docsLimit.Inc(len(batch), source); err != nil { + if err := b.docsLimit.Inc(batch.Size(), source); err != nil { return batch, 0, 0, err } } @@ -814,8 +833,8 @@ func (b *block) addAggregateResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the xresource. - size, docsCount := results.AddFields(batch) + // try to add the docs to the resource. + size, resultCount := results.AddFields(batch) // immediately release the checkout on the lifetime of query. cancellable.ReleaseCheckout() @@ -828,7 +847,7 @@ func (b *block) addAggregateResults( batch = batch[:0] // return results. - return batch, size, docsCount, nil + return batch, size, resultCount, nil } func (b *block) AddResults( From 5a9968256a4b60a770aa8237bcc276520912d915 Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 00:41:07 -0500 Subject: [PATCH 02/16] Cleanup --- src/dbnode/storage/index/aggregate_results.go | 26 +++++++++---------- src/dbnode/storage/index/block.go | 10 +------ 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 9c08d395bf..0de716c15d 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -42,8 +42,8 @@ type aggregatedResults struct { nsID ident.ID aggregateOpts AggregateResultsOptions - resultsMap *AggregateResultsMap - totalResultCount int + resultsMap *AggregateResultsMap + totalDocsCount int idPool ident.Pool bytesPool pool.CheckedBytesPool @@ -100,7 +100,7 @@ func (r *aggregatedResults) Reset( // reset all keys in the map next r.resultsMap.Reset() - r.totalResultCount = 0 + r.totalDocsCount = 0 // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. @@ -111,11 +111,10 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) r.Lock() err := r.addDocumentsBatchWithLock(batch) size := r.resultsMap.Len() - resultCount := r.totalResultCount + len(batch) - r.totalResultCount = resultCount - fmt.Println("Size", size, "resultCount", resultCount, "BatchLen", len(batch)) + docsCount := r.totalDocsCount + len(batch) + r.totalDocsCount = docsCount r.Unlock() - return size, resultCount, err + return size, docsCount, err } func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { @@ -124,7 +123,7 @@ func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { r.Lock() - maxInsertions := r.aggregateOpts.SizeLimit - r.totalResultCount + maxInsertions := r.aggregateOpts.SizeLimit - r.totalDocsCount valueInsertions := 0 for _, entry := range batch { f := entry.Field @@ -153,7 +152,6 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) NoFinalizeKey: false, }) valueInsertions++ - fmt.Println("max", maxInsertions, "value", valueInsertions) } else { // this value exceeds the limit, so should be released to the underling // pool without adding to the map. @@ -167,10 +165,10 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } } size := r.resultsMap.Len() - resultCount := r.totalResultCount + valueInsertions - r.totalResultCount = resultCount + docsCount := r.totalDocsCount + valueInsertions + r.totalDocsCount = docsCount r.Unlock() - return size, resultCount + return size, docsCount } func (r *aggregatedResults) addDocumentsBatchWithLock( @@ -328,9 +326,9 @@ func (r *aggregatedResults) Size() int { return l } -func (r *aggregatedResults) totalResultCount() int { +func (r *aggregatedResults) TotalDocsCount() int { r.RLock() - count := r.totalResultCount + count := r.totalDocsCount r.RUnlock() return count } diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 11ade352cd..76d902967a 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -684,10 +684,7 @@ func (b *block) aggregateWithSpan( maxBatch = opts.DocsLimit } - for idx, reader := range readers { - if size > 0 || resultCount > 0 { - fmt.Println("reader", idx, "Size", size, "resultCount", resultCount) - } + for _, reader := range readers { if opts.LimitsExceeded(size, resultCount) { exhaustive = true break @@ -705,15 +702,12 @@ func (b *block) aggregateWithSpan( } field, term := iter.Current() - before := len(batch) batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) - fmt.Println("Adding term", string(field), string(term), "before", before, "len", len(batch), "iterateTerms", iterateTerms, "Size batch", batch.Size()) if batch.Size() < maxBatch { continue } batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) - fmt.Println("Added aggregate results", size, "resultCount", resultCount) if err != nil { return false, err } @@ -731,9 +725,7 @@ func (b *block) aggregateWithSpan( // Add last batch to results if remaining. for len(batch) > 0 { - fmt.Println("Adding aggregate results at end", size, "resultCount", resultCount, "Batch size", len(batch)) batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) - fmt.Println("Added aggregate results at end", size, "resultCount", resultCount, "Batch size", len(batch)) if err != nil { return false, err } From a0ca9dbcce80524235b18cb520f56f5be3d73a5a Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 12:00:01 -0500 Subject: [PATCH 03/16] [integration test] Add label query limits integration test --- .../prometheus/test.sh | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 1d3482c232..4cea36b857 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -391,6 +391,36 @@ function test_series { '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' } +function test_label_query_limits_applied { + # Test the default series limit applied when directly querying + # coordinator (series limit set by header) + echo "Test label series limit with coordinator limit header" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + + echo "Test label series limit with require-exhaustive headers false" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + + echo "Test label series limit with require-exhaustive headers true (below limit therefore no error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values = "200" ]]' + + # Test the default docs limit applied when directly querying + # coordinator (docs limit set by header) + echo "Test label docs limit with coordinator limit header" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + + echo "Test label docs limit with require-exhaustive headers false" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + + echo "Test label docs limit with require-exhaustive headers true (below limit therefore no error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values = "200" ]]' +} + echo "Running readiness test" test_readiness @@ -409,6 +439,7 @@ test_prometheus_query_native_timeout test_query_restrict_tags test_prometheus_remote_write_map_tags test_series +test_label_query_limits_applied echo "Running function correctness tests" test_correctness From 5d2173da9190c763a4cf708ddbf2a26a60af9322 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 12:02:37 -0500 Subject: [PATCH 04/16] Add paren --- scripts/docker-integration-tests/prometheus/test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 4cea36b857..d28a33a5cf 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -404,7 +404,7 @@ function test_label_query_limits_applied { echo "Test label series limit with require-exhaustive headers true (below limit therefore no error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values = "200" ]]' + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' # Test the default docs limit applied when directly querying # coordinator (docs limit set by header) @@ -418,7 +418,7 @@ function test_label_query_limits_applied { echo "Test label docs limit with require-exhaustive headers true (below limit therefore no error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values = "200" ]]' + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' } echo "Running readiness test" From c4ee6c616a3119c6147dbe190fc49953527529bc Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 12:36:16 -0500 Subject: [PATCH 05/16] Fixed exhaustive case, remove dead code --- src/dbnode/storage/index.go | 23 ++- src/dbnode/storage/index/aggregate_results.go | 159 ++---------------- src/dbnode/storage/index/block.go | 13 +- src/dbnode/storage/index/block_test.go | 6 +- src/dbnode/storage/index/index_mock.go | 157 ++++++++++++----- src/dbnode/storage/index/types.go | 22 ++- .../storage/index/wide_query_results.go | 4 +- src/dbnode/storage/index_block_test.go | 2 +- 8 files changed, 177 insertions(+), 209 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 64d285f701..be6b89bc9c 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1421,6 +1421,7 @@ func (i *nsIndex) AggregateQuery( results := i.aggregateResultsPool.Get() aopts := index.AggregateResultsOptions{ SizeLimit: opts.SeriesLimit, + DocsLimit: opts.DocsLimit, FieldFilter: opts.FieldFilter, Type: opts.Type, } @@ -1687,7 +1688,16 @@ func (i *nsIndex) execBlockQueryFn( sp.LogFields(logFields...) defer sp.Finish() - blockExhaustive, err := block.Query(ctx, cancellable, query, opts, results, logFields) + docResults, ok := results.(index.DocumentResults) + if !ok { // should never happen + state.Lock() + err := fmt.Errorf("unknown results type [%T] received during wide query", results) + state.multiErr = state.multiErr.Add(err) + state.Unlock() + return + } + + blockExhaustive, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1725,7 +1735,16 @@ func (i *nsIndex) execBlockWideQueryFn( sp.LogFields(logFields...) defer sp.Finish() - _, err := block.Query(ctx, cancellable, query, opts, results, logFields) + docResults, ok := results.(index.DocumentResults) + if !ok { // should never happen + state.Lock() + err := fmt.Errorf("unknown results type [%T] received during wide query", results) + state.multiErr = state.multiErr.Add(err) + state.Unlock() + return + } + + _, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 0de716c15d..4d5db6544b 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -22,9 +22,9 @@ package index import ( "fmt" + "math" "sync" - "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" @@ -43,6 +43,7 @@ type aggregatedResults struct { aggregateOpts AggregateResultsOptions resultsMap *AggregateResultsMap + size int totalDocsCount int idPool ident.Pool @@ -101,29 +102,24 @@ func (r *aggregatedResults) Reset( // reset all keys in the map next r.resultsMap.Reset() r.totalDocsCount = 0 + r.size = 0 // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. r.Unlock() } -func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) { - r.Lock() - err := r.addDocumentsBatchWithLock(batch) - size := r.resultsMap.Len() - docsCount := r.totalDocsCount + len(batch) - r.totalDocsCount = docsCount - r.Unlock() - return size, docsCount, err -} - func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { return r.aggregateOpts } func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { r.Lock() - maxInsertions := r.aggregateOpts.SizeLimit - r.totalDocsCount + maxInsertions := int(math.MaxInt64) + if r.aggregateOpts.SizeLimit != 0 { + maxInsertions = r.aggregateOpts.SizeLimit - r.totalDocsCount + } + valueInsertions := 0 for _, entry := range batch { f := entry.Field @@ -144,6 +140,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) valuesMap := aggValues.Map() for _, t := range entry.Terms { if !valuesMap.Contains(t) { + fmt.Println(maxInsertions, valueInsertions, t) // we can avoid the copy because we assume ownership of the passed ident.ID, // but still need to finalize it. if maxInsertions > valueInsertions { @@ -164,145 +161,11 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } } } - size := r.resultsMap.Len() + docsCount := r.totalDocsCount + valueInsertions r.totalDocsCount = docsCount r.Unlock() - return size, docsCount -} - -func (r *aggregatedResults) addDocumentsBatchWithLock( - batch []doc.Document, -) error { - for _, doc := range batch { - switch r.aggregateOpts.Type { - case AggregateTagNamesAndValues: - if err := r.addDocumentWithLock(doc); err != nil { - return err - } - - case AggregateTagNames: - // NB: if aggregating by name only, can ignore any additional documents - // after the result map size exceeds the optional size limit, since all - // incoming terms are either duplicates or new values which will exceed - // the limit. - size := r.resultsMap.Len() - if r.aggregateOpts.SizeLimit > 0 && size >= r.aggregateOpts.SizeLimit { - return nil - } - - if err := r.addDocumentTermsWithLock(doc); err != nil { - return err - } - default: - return fmt.Errorf("unsupported aggregation type: %v", r.aggregateOpts.Type) - } - } - - return nil -} - -func (r *aggregatedResults) addDocumentTermsWithLock( - container doc.Document, -) error { - document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) - if err != nil { - return fmt.Errorf("unable to decode encoded document; %w", err) - } - for _, field := range document.Fields { - if err := r.addTermWithLock(field.Name); err != nil { - return fmt.Errorf("unable to add document terms [%+v]: %v", document, err) - } - } - - return nil -} - -func (r *aggregatedResults) addTermWithLock( - term []byte, -) error { - if len(term) == 0 { - return fmt.Errorf(missingDocumentFields, "term") - } - - // if a term filter is provided, ensure this field matches the filter, - // otherwise ignore it. - filter := r.aggregateOpts.FieldFilter - if filter != nil && !filter.Allow(term) { - return nil - } - - // NB: can cast the []byte -> ident.ID to avoid an alloc - // before we're sure we need it. - termID := ident.BytesID(term) - if r.resultsMap.Contains(termID) { - // NB: this term is already added; continue. - return nil - } - - // Set results map to an empty AggregateValues since we only care about - // existence of the term in the map, rather than its set of values. - r.resultsMap.Set(termID, emptyValues) - return nil -} - -func (r *aggregatedResults) addDocumentWithLock( - container doc.Document, -) error { - document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) - if err != nil { - return fmt.Errorf("unable to decode encoded document; %w", err) - } - for _, field := range document.Fields { - if err := r.addFieldWithLock(field.Name, field.Value); err != nil { - return fmt.Errorf("unable to add document [%+v]: %v", document, err) - } - } - - return nil -} - -func (r *aggregatedResults) addFieldWithLock( - term []byte, - value []byte, -) error { - if len(term) == 0 { - return fmt.Errorf(missingDocumentFields, "term") - } - - // if a term filter is provided, ensure this field matches the filter, - // otherwise ignore it. - filter := r.aggregateOpts.FieldFilter - if filter != nil && !filter.Allow(term) { - return nil - } - - // NB: can cast the []byte -> ident.ID to avoid an alloc - // before we're sure we need it. - termID := ident.BytesID(term) - valueID := ident.BytesID(value) - - valueMap, found := r.resultsMap.Get(termID) - if found { - return valueMap.addValue(valueID) - } - - // NB: if over limit, do not add any new values to the map. - if r.aggregateOpts.SizeLimit > 0 && - r.resultsMap.Len() >= r.aggregateOpts.SizeLimit { - // Early return if limit enforced and we hit our limit. - return nil - } - - aggValues := r.valuesPool.Get() - if err := aggValues.addValue(valueID); err != nil { - // Return these values to the pool. - r.valuesPool.Put(aggValues) - return err - } - - r.resultsMap.Set(termID, aggValues) - return nil + return r.resultsMap.Len(), docsCount } func (r *aggregatedResults) Namespace() ident.ID { diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 76d902967a..e692f5ccc5 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -403,7 +403,7 @@ func (b *block) Query( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, logFields []opentracinglog.Field, ) (bool, error) { ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) @@ -423,7 +423,7 @@ func (b *block) queryWithSpan( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, sp opentracing.Span, logFields []opentracinglog.Field, ) (bool, error) { @@ -532,7 +532,7 @@ func (b *block) closeAsync(closer io.Closer) { func (b *block) addQueryResults( cancellable *xresource.CancellableLifetime, - results BaseResults, + results DocumentResults, batch []doc.Document, source []byte, ) ([]doc.Document, int, int, error) { @@ -550,7 +550,7 @@ func (b *block) addQueryResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the xresource. + // try to add the docs to the resource. size, docsCount, err := results.AddDocuments(batch) // immediately release the checkout on the lifetime of query. @@ -647,7 +647,6 @@ func (b *block) aggregateWithSpan( batch = AggregateResultsEntries(b.opts.AggregateResultsEntryArrayPool().Get()) maxBatch = cap(batch) iterClosed = false // tracking whether we need to free the iterator at the end. - exhaustive = false ) if maxBatch == 0 { maxBatch = defaultAggregateResultsEntryBatchSize @@ -686,7 +685,6 @@ func (b *block) aggregateWithSpan( for _, reader := range readers { if opts.LimitsExceeded(size, resultCount) { - exhaustive = true break } @@ -697,7 +695,6 @@ func (b *block) aggregateWithSpan( iterClosed = false // only once the iterator has been successfully Reset(). for iter.Next() { if opts.LimitsExceeded(size, resultCount) { - exhaustive = true break } @@ -731,7 +728,7 @@ func (b *block) aggregateWithSpan( } } - return exhaustive || opts.exhaustive(size, resultCount), nil + return opts.exhaustive(size, resultCount), nil } func (b *block) appendFieldAndTermToBatch( diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index f33c6b71e9..629c5f3709 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -1866,6 +1866,8 @@ func TestBlockAggregate(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + // NB: seriesLimit must be higher than the number of fields to be exhaustive. + seriesLimit := 5 testMD := newTestNSMetadata(t) start := time.Now().Truncate(time.Hour) blk, err := NewBlock(start, testMD, BlockOptions{}, @@ -1888,7 +1890,7 @@ func TestBlockAggregate(t *testing.T) { } results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ - SizeLimit: 3, + SizeLimit: seriesLimit, Type: AggregateTagNamesAndValues, }, testOpts) @@ -1917,7 +1919,7 @@ func TestBlockAggregate(t *testing.T) { exhaustive, err := b.Aggregate( ctx, xresource.NewCancellableLifetime(), - QueryOptions{SeriesLimit: 3}, + QueryOptions{SeriesLimit: seriesLimit}, results, emptyLogFields) require.NoError(t, err) diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 0c44cf9614..ab6af6aded 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -128,32 +128,123 @@ func (mr *MockBaseResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockBaseResults)(nil).EnforceLimits)) } -// AddDocuments mocks base method -func (m *MockBaseResults) AddDocuments(batch []doc.Document) (int, int, error) { +// Finalize mocks base method +func (m *MockBaseResults) Finalize() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddDocuments", batch) + m.ctrl.Call(m, "Finalize") +} + +// Finalize indicates an expected call of Finalize +func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) +} + +// MockDocumentResults is a mock of DocumentResults interface +type MockDocumentResults struct { + ctrl *gomock.Controller + recorder *MockDocumentResultsMockRecorder +} + +// MockDocumentResultsMockRecorder is the mock recorder for MockDocumentResults +type MockDocumentResultsMockRecorder struct { + mock *MockDocumentResults +} + +// NewMockDocumentResults creates a new mock instance +func NewMockDocumentResults(ctrl *gomock.Controller) *MockDocumentResults { + mock := &MockDocumentResults{ctrl: ctrl} + mock.recorder = &MockDocumentResultsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDocumentResults) EXPECT() *MockDocumentResultsMockRecorder { + return m.recorder +} + +// Namespace mocks base method +func (m *MockDocumentResults) Namespace() ident.ID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Namespace") + ret0, _ := ret[0].(ident.ID) + return ret0 +} + +// Namespace indicates an expected call of Namespace +func (mr *MockDocumentResultsMockRecorder) Namespace() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Namespace", reflect.TypeOf((*MockDocumentResults)(nil).Namespace)) +} + +// Size mocks base method +func (m *MockDocumentResults) Size() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") ret0, _ := ret[0].(int) - ret1, _ := ret[1].(int) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + return ret0 } -// AddDocuments indicates an expected call of AddDocuments -func (mr *MockBaseResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { +// Size indicates an expected call of Size +func (mr *MockDocumentResultsMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockDocumentResults)(nil).Size)) +} + +// TotalDocsCount mocks base method +func (m *MockDocumentResults) TotalDocsCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDocsCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// TotalDocsCount indicates an expected call of TotalDocsCount +func (mr *MockDocumentResultsMockRecorder) TotalDocsCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockDocumentResults)(nil).TotalDocsCount)) +} + +// EnforceLimits mocks base method +func (m *MockDocumentResults) EnforceLimits() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnforceLimits") + ret0, _ := ret[0].(bool) + return ret0 +} + +// EnforceLimits indicates an expected call of EnforceLimits +func (mr *MockDocumentResultsMockRecorder) EnforceLimits() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockBaseResults)(nil).AddDocuments), batch) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockDocumentResults)(nil).EnforceLimits)) } // Finalize mocks base method -func (m *MockBaseResults) Finalize() { +func (m *MockDocumentResults) Finalize() { m.ctrl.T.Helper() m.ctrl.Call(m, "Finalize") } // Finalize indicates an expected call of Finalize -func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { +func (mr *MockDocumentResultsMockRecorder) Finalize() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockDocumentResults)(nil).Finalize)) +} + +// AddDocuments mocks base method +func (m *MockDocumentResults) AddDocuments(batch []doc.Document) (int, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddDocuments", batch) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// AddDocuments indicates an expected call of AddDocuments +func (mr *MockDocumentResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockDocumentResults)(nil).AddDocuments), batch) } // MockQueryResults is a mock of QueryResults interface @@ -235,6 +326,18 @@ func (mr *MockQueryResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockQueryResults)(nil).EnforceLimits)) } +// Finalize mocks base method +func (m *MockQueryResults) Finalize() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Finalize") +} + +// Finalize indicates an expected call of Finalize +func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) +} + // AddDocuments mocks base method func (m *MockQueryResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() @@ -251,18 +354,6 @@ func (mr *MockQueryResultsMockRecorder) AddDocuments(batch interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockQueryResults)(nil).AddDocuments), batch) } -// Finalize mocks base method -func (m *MockQueryResults) Finalize() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Finalize") -} - -// Finalize indicates an expected call of Finalize -func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) -} - // Reset mocks base method func (m *MockQueryResults) Reset(nsID ident.ID, opts QueryResultsOptions) { m.ctrl.T.Helper() @@ -429,22 +520,6 @@ func (mr *MockAggregateResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockAggregateResults)(nil).EnforceLimits)) } -// AddDocuments mocks base method -func (m *MockAggregateResults) AddDocuments(batch []doc.Document) (int, int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddDocuments", batch) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(int) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// AddDocuments indicates an expected call of AddDocuments -func (mr *MockAggregateResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockAggregateResults)(nil).AddDocuments), batch) -} - // Finalize mocks base method func (m *MockAggregateResults) Finalize() { m.ctrl.T.Helper() @@ -774,7 +849,7 @@ func (mr *MockBlockMockRecorder) WriteBatch(inserts interface{}) *gomock.Call { } // Query mocks base method -func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results BaseResults, logFields []log.Field) (bool, error) { +func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results DocumentResults, logFields []log.Field) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Query", ctx, cancellable, query, opts, results, logFields) ret0, _ := ret[0].(bool) diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 9e5e07ec07..e54acefc2c 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -159,22 +159,29 @@ type BaseResults interface { // EnforceLimits returns whether this should enforce and increment limits. EnforceLimits() bool + // Finalize releases any resources held by the Results object, + // including returning it to a backing pool. + Finalize() +} + +// DocumentResults is a collection of query results that allow accumulation of +// document values, it is synchronized when access to the results set is used +// as documented by the methods. +type DocumentResults interface { + BaseResults + // AddDocuments adds the batch of documents to the results set, it will // take a copy of the bytes backing the documents so the original can be // modified after this function returns without affecting the results map. // TODO(r): We will need to change this behavior once index fields are // mutable and the most recent need to shadow older entries. AddDocuments(batch []doc.Document) (size, docsCount int, err error) - - // Finalize releases any resources held by the Results object, - // including returning it to a backing pool. - Finalize() } // QueryResults is a collection of results for a query, it is synchronized // when access to the results set is used as documented by the methods. type QueryResults interface { - BaseResults + DocumentResults // Reset resets the Results object to initial state. Reset(nsID ident.ID, opts QueryResultsOptions) @@ -259,6 +266,9 @@ type AggregateResultsOptions struct { // overflown will return early successfully. SizeLimit int + // DocsLimit limits the amount of documents + DocsLimit int + // Type determines what result is required. Type AggregationType @@ -357,7 +367,7 @@ type Block interface { cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results BaseResults, + results DocumentResults, logFields []opentracinglog.Field, ) (bool, error) diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index af6b707584..cb5ad85312 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -38,6 +38,8 @@ var ErrWideQueryResultsExhausted = errors.New("no more values to add to wide que type shardFilterFn func(ident.ID) (uint32, bool) +var _ DocumentResults = (*wideResults)(nil) + type wideResults struct { sync.RWMutex size int @@ -72,7 +74,7 @@ func NewWideQueryResults( shardFilter shardFilterFn, collector chan *ident.IDBatch, opts WideQueryOptions, -) BaseResults { +) DocumentResults { batchSize := opts.BatchSize results := &wideResults{ nsID: namespaceID, diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index a4cbadcdfa..5438bcebb9 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -829,7 +829,7 @@ func TestLimits(t *testing.T) { cancellable interface{}, query interface{}, opts interface{}, - results index.BaseResults, + results index.DocumentResults, logFields interface{}) (bool, error) { _, _, err = results.AddDocuments([]doc.Document{ // Results in size=1 and docs=2. From ebfaa356e7e562b3018adf84848ae41f2286382e Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 13:55:52 -0500 Subject: [PATCH 06/16] Aggregate results changes --- src/dbnode/storage/index/aggregate_results.go | 82 ++++++++++++++----- src/dbnode/storage/index/block_test.go | 29 ++++--- 2 files changed, 77 insertions(+), 34 deletions(-) diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 4d5db6544b..f9bc5f12b7 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -21,7 +21,6 @@ package index import ( - "fmt" "math" "sync" @@ -43,7 +42,7 @@ type aggregatedResults struct { aggregateOpts AggregateResultsOptions resultsMap *AggregateResultsMap - size int + size int totalDocsCount int idPool ident.Pool @@ -115,35 +114,81 @@ func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { r.Lock() - maxInsertions := int(math.MaxInt64) + defer r.Unlock() + + maxDocs := int(math.MaxInt64) + if r.aggregateOpts.DocsLimit != 0 { + maxDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount + } + + // NB: already hit doc limit. + if maxDocs <= 0 { + for _, entry := range batch { + entry.Field.Finalize() + for _, term := range entry.Terms { + term.Finalize() + } + } + + return r.size, r.totalDocsCount + } + + maxInserts := maxDocs if r.aggregateOpts.SizeLimit != 0 { - maxInsertions = r.aggregateOpts.SizeLimit - r.totalDocsCount + if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < maxInserts { + maxInserts = remaining + } } + limitTripped := false + docs := 0 valueInsertions := 0 for _, entry := range batch { + if docs >= maxDocs || valueInsertions >= maxInserts { + limitTripped = true + } + + if limitTripped { + entry.Field.Finalize() + for _, term := range entry.Terms { + term.Finalize() + } + + r.size = r.size + valueInsertions + r.totalDocsCount = r.totalDocsCount + docs + return r.size, r.totalDocsCount + } + + docs++ f := entry.Field aggValues, ok := r.resultsMap.Get(f) if !ok { - aggValues = r.valuesPool.Get() - // we can avoid the copy because we assume ownership of the passed ident.ID, - // but still need to finalize it. - r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) + if maxInserts > valueInsertions { + valueInsertions++ + aggValues = r.valuesPool.Get() + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + } else { + // this value exceeds the limit, so should be released to the underling + // pool without adding to the map. + f.Finalize() + } } else { // because we already have a entry for this field, we release the ident back to // the underlying pool. f.Finalize() } + valuesMap := aggValues.Map() for _, t := range entry.Terms { if !valuesMap.Contains(t) { - fmt.Println(maxInsertions, valueInsertions, t) // we can avoid the copy because we assume ownership of the passed ident.ID, // but still need to finalize it. - if maxInsertions > valueInsertions { + if maxInserts > valueInsertions { valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: false, @@ -162,10 +207,9 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } } - docsCount := r.totalDocsCount + valueInsertions - r.totalDocsCount = docsCount - r.Unlock() - return r.resultsMap.Len(), docsCount + r.size = r.size + valueInsertions + r.totalDocsCount = r.totalDocsCount + docs + return r.size, r.totalDocsCount } func (r *aggregatedResults) Namespace() ident.ID { @@ -184,9 +228,9 @@ func (r *aggregatedResults) Map() *AggregateResultsMap { func (r *aggregatedResults) Size() int { r.RLock() - l := r.resultsMap.Len() + size := r.size r.RUnlock() - return l + return size } func (r *aggregatedResults) TotalDocsCount() int { diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 629c5f3709..1b057254cb 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -1867,7 +1867,7 @@ func TestBlockAggregate(t *testing.T) { defer ctrl.Finish() // NB: seriesLimit must be higher than the number of fields to be exhaustive. - seriesLimit := 5 + seriesLimit := 10 testMD := newTestNSMetadata(t) start := time.Now().Truncate(time.Hour) blk, err := NewBlock(start, testMD, BlockOptions{}, @@ -1902,20 +1902,19 @@ func TestBlockAggregate(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - gomock.InOrder( - iter.EXPECT().Reset(reader, gomock.Any()).Return(nil), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")), - iter.EXPECT().Next().Return(true), - iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")), - iter.EXPECT().Next().Return(false), - iter.EXPECT().Err().Return(nil), - iter.EXPECT().Close().Return(nil), - ) + iter.EXPECT().Reset(reader, gomock.Any()).Return(nil) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")) + iter.EXPECT().Next().Return(false) + iter.EXPECT().Err().Return(nil) + iter.EXPECT().Close().Return(nil) + exhaustive, err := b.Aggregate( ctx, xresource.NewCancellableLifetime(), From 849c92b47fc18deb281ad05ac5599a39b399df1e Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 13:42:15 -0500 Subject: [PATCH 07/16] Add proper require exhaustive conversion + integration test for agg queries --- .../prometheus/test.sh | 16 ++++ src/dbnode/generated/thrift/rpc.thrift | 2 + src/dbnode/generated/thrift/rpc/rpc.go | 92 +++++++++++++++++++ .../server/tchannelthrift/convert/convert.go | 10 ++ .../tchannelthrift/convert/convert_test.go | 27 +++--- src/query/storage/index.go | 11 ++- src/query/storage/index_test.go | 7 +- 7 files changed, 147 insertions(+), 18 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index d28a33a5cf..343f9df1e9 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -406,6 +406,14 @@ function test_label_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' + echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" + # Test that require exhaustive error is returned + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + # Test that require exhaustive error is 4xx + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' + # Test the default docs limit applied when directly querying # coordinator (docs limit set by header) echo "Test label docs limit with coordinator limit header" @@ -419,6 +427,14 @@ function test_label_query_limits_applied { echo "Test label docs limit with require-exhaustive headers true (below limit therefore no error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' + + echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)" + # Test that require exhaustive error is returned + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + # Test that require exhaustive error is 4xx + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' } echo "Running readiness test" diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index e6876db393..fd7d2da373 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -404,6 +404,7 @@ struct AggregateQueryRawRequest { 8: optional TimeType rangeType = TimeType.UNIX_SECONDS 9: optional binary source 10: optional i64 docsLimit + 11: optional bool requireExhaustive } struct AggregateQueryRawResult { @@ -432,6 +433,7 @@ struct AggregateQueryRequest { 8: optional TimeType rangeType = TimeType.UNIX_SECONDS 9: optional binary source 10: optional i64 docsLimit + 11: optional bool requireExhaustive } struct AggregateQueryResult { diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index c15f440908..ebad92c114 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -9616,6 +9616,7 @@ func (p *HealthResult_) String() string { // - RangeType // - Source // - DocsLimit +// - RequireExhaustive type AggregateQueryRawRequest struct { Query []byte `thrift:"query,1,required" db:"query" json:"query"` RangeStart int64 `thrift:"rangeStart,2,required" db:"rangeStart" json:"rangeStart"` @@ -9627,6 +9628,7 @@ type AggregateQueryRawRequest struct { RangeType TimeType `thrift:"rangeType,8" db:"rangeType" json:"rangeType,omitempty"` Source []byte `thrift:"source,9" db:"source" json:"source,omitempty"` DocsLimit *int64 `thrift:"docsLimit,10" db:"docsLimit" json:"docsLimit,omitempty"` + RequireExhaustive *bool `thrift:"requireExhaustive,11" db:"requireExhaustive" json:"requireExhaustive,omitempty"` } func NewAggregateQueryRawRequest() *AggregateQueryRawRequest { @@ -9694,6 +9696,15 @@ func (p *AggregateQueryRawRequest) GetDocsLimit() int64 { } return *p.DocsLimit } + +var AggregateQueryRawRequest_RequireExhaustive_DEFAULT bool + +func (p *AggregateQueryRawRequest) GetRequireExhaustive() bool { + if !p.IsSetRequireExhaustive() { + return AggregateQueryRawRequest_RequireExhaustive_DEFAULT + } + return *p.RequireExhaustive +} func (p *AggregateQueryRawRequest) IsSetSeriesLimit() bool { return p.SeriesLimit != nil } @@ -9718,6 +9729,10 @@ func (p *AggregateQueryRawRequest) IsSetDocsLimit() bool { return p.DocsLimit != nil } +func (p *AggregateQueryRawRequest) IsSetRequireExhaustive() bool { + return p.RequireExhaustive != nil +} + func (p *AggregateQueryRawRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -9781,6 +9796,10 @@ func (p *AggregateQueryRawRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField10(iprot); err != nil { return err } + case 11: + if err := p.ReadField11(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -9913,6 +9932,15 @@ func (p *AggregateQueryRawRequest) ReadField10(iprot thrift.TProtocol) error { return nil } +func (p *AggregateQueryRawRequest) ReadField11(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 11: ", err) + } else { + p.RequireExhaustive = &v + } + return nil +} + func (p *AggregateQueryRawRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("AggregateQueryRawRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -9948,6 +9976,9 @@ func (p *AggregateQueryRawRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField10(oprot); err != nil { return err } + if err := p.writeField11(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -10108,6 +10139,21 @@ func (p *AggregateQueryRawRequest) writeField10(oprot thrift.TProtocol) (err err return err } +func (p *AggregateQueryRawRequest) writeField11(oprot thrift.TProtocol) (err error) { + if p.IsSetRequireExhaustive() { + if err := oprot.WriteFieldBegin("requireExhaustive", thrift.BOOL, 11); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 11:requireExhaustive: ", p), err) + } + if err := oprot.WriteBool(bool(*p.RequireExhaustive)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.requireExhaustive (11) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 11:requireExhaustive: ", p), err) + } + } + return err +} + func (p *AggregateQueryRawRequest) String() string { if p == nil { return "" @@ -10544,6 +10590,7 @@ func (p *AggregateQueryRawResultTagValueElement) String() string { // - RangeType // - Source // - DocsLimit +// - RequireExhaustive type AggregateQueryRequest struct { Query *Query `thrift:"query,1" db:"query" json:"query,omitempty"` RangeStart int64 `thrift:"rangeStart,2,required" db:"rangeStart" json:"rangeStart"` @@ -10555,6 +10602,7 @@ type AggregateQueryRequest struct { RangeType TimeType `thrift:"rangeType,8" db:"rangeType" json:"rangeType,omitempty"` Source []byte `thrift:"source,9" db:"source" json:"source,omitempty"` DocsLimit *int64 `thrift:"docsLimit,10" db:"docsLimit" json:"docsLimit,omitempty"` + RequireExhaustive *bool `thrift:"requireExhaustive,11" db:"requireExhaustive" json:"requireExhaustive,omitempty"` } func NewAggregateQueryRequest() *AggregateQueryRequest { @@ -10627,6 +10675,15 @@ func (p *AggregateQueryRequest) GetDocsLimit() int64 { } return *p.DocsLimit } + +var AggregateQueryRequest_RequireExhaustive_DEFAULT bool + +func (p *AggregateQueryRequest) GetRequireExhaustive() bool { + if !p.IsSetRequireExhaustive() { + return AggregateQueryRequest_RequireExhaustive_DEFAULT + } + return *p.RequireExhaustive +} func (p *AggregateQueryRequest) IsSetQuery() bool { return p.Query != nil } @@ -10655,6 +10712,10 @@ func (p *AggregateQueryRequest) IsSetDocsLimit() bool { return p.DocsLimit != nil } +func (p *AggregateQueryRequest) IsSetRequireExhaustive() bool { + return p.RequireExhaustive != nil +} + func (p *AggregateQueryRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -10716,6 +10777,10 @@ func (p *AggregateQueryRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField10(iprot); err != nil { return err } + case 11: + if err := p.ReadField11(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -10844,6 +10909,15 @@ func (p *AggregateQueryRequest) ReadField10(iprot thrift.TProtocol) error { return nil } +func (p *AggregateQueryRequest) ReadField11(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 11: ", err) + } else { + p.RequireExhaustive = &v + } + return nil +} + func (p *AggregateQueryRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("AggregateQueryRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -10879,6 +10953,9 @@ func (p *AggregateQueryRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField10(oprot); err != nil { return err } + if err := p.writeField11(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -11041,6 +11118,21 @@ func (p *AggregateQueryRequest) writeField10(oprot thrift.TProtocol) (err error) return err } +func (p *AggregateQueryRequest) writeField11(oprot thrift.TProtocol) (err error) { + if p.IsSetRequireExhaustive() { + if err := oprot.WriteFieldBegin("requireExhaustive", thrift.BOOL, 11); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 11:requireExhaustive: ", p), err) + } + if err := oprot.WriteBool(bool(*p.RequireExhaustive)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.requireExhaustive (11) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 11:requireExhaustive: ", p), err) + } + } + return err +} + func (p *AggregateQueryRequest) String() string { if p == nil { return "" diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert.go b/src/dbnode/network/server/tchannelthrift/convert/convert.go index f62e60101b..6e313533a4 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert.go @@ -326,6 +326,9 @@ func FromRPCAggregateQueryRequest( if l := req.DocsLimit; l != nil { opts.DocsLimit = int(*l) } + if r := req.RequireExhaustive; r != nil { + opts.RequireExhaustive = *r + } if len(req.Source) > 0 { opts.Source = req.Source @@ -378,6 +381,9 @@ func FromRPCAggregateQueryRawRequest( if l := req.DocsLimit; l != nil { opts.DocsLimit = int(*l) } + if r := req.RequireExhaustive; r != nil { + opts.RequireExhaustive = *r + } if len(req.Source) > 0 { opts.Source = req.Source @@ -435,6 +441,10 @@ func ToRPCAggregateQueryRawRequest( l := int64(opts.DocsLimit) request.DocsLimit = &l } + if opts.RequireExhaustive { + r := opts.RequireExhaustive + request.RequireExhaustive = &r + } if len(opts.Source) > 0 { request.Source = opts.Source diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert_test.go b/src/dbnode/network/server/tchannelthrift/convert/convert_test.go index f150827389..8e534be415 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert_test.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert_test.go @@ -172,16 +172,18 @@ func TestConvertFetchTaggedRequest(t *testing.T) { func TestConvertAggregateRawQueryRequest(t *testing.T) { var ( - seriesLimit int64 = 10 - docsLimit int64 = 10 - ns = ident.StringID("abc") + seriesLimit int64 = 10 + docsLimit int64 = 10 + requireExhaustive = true + ns = ident.StringID("abc") ) opts := index.AggregationOptions{ QueryOptions: index.QueryOptions{ - StartInclusive: time.Now().Add(-900 * time.Hour), - EndExclusive: time.Now(), - SeriesLimit: int(seriesLimit), - DocsLimit: int(docsLimit), + StartInclusive: time.Now().Add(-900 * time.Hour), + EndExclusive: time.Now(), + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), + RequireExhaustive: requireExhaustive, }, Type: index.AggregateTagNamesAndValues, FieldFilter: index.AggregateFieldFilter{ @@ -190,11 +192,12 @@ func TestConvertAggregateRawQueryRequest(t *testing.T) { }, } requestSkeleton := &rpc.AggregateQueryRawRequest{ - NameSpace: ns.Bytes(), - RangeStart: mustToRpcTime(t, opts.StartInclusive), - RangeEnd: mustToRpcTime(t, opts.EndExclusive), - SeriesLimit: &seriesLimit, - DocsLimit: &docsLimit, + NameSpace: ns.Bytes(), + RangeStart: mustToRpcTime(t, opts.StartInclusive), + RangeEnd: mustToRpcTime(t, opts.EndExclusive), + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, + RequireExhaustive: &requireExhaustive, TagNameFilter: [][]byte{ []byte("some"), []byte("string"), diff --git a/src/query/storage/index.go b/src/query/storage/index.go index 7315076546..a7d2cfd786 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -95,11 +95,12 @@ func FetchOptionsToAggregateOptions( ) index.AggregationOptions { return index.AggregationOptions{ QueryOptions: index.QueryOptions{ - SeriesLimit: fetchOptions.SeriesLimit, - DocsLimit: fetchOptions.DocsLimit, - Source: fetchOptions.Source, - StartInclusive: tagQuery.Start, - EndExclusive: tagQuery.End, + SeriesLimit: fetchOptions.SeriesLimit, + DocsLimit: fetchOptions.DocsLimit, + Source: fetchOptions.Source, + RequireExhaustive: fetchOptions.RequireExhaustive, + StartInclusive: tagQuery.Start, + EndExclusive: tagQuery.End, }, FieldFilter: tagQuery.FilterNameTags, Type: convertAggregateQueryType(tagQuery.CompleteNameOnly), diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index a71c5b1800..8f1edc97ad 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -256,7 +256,9 @@ func TestFetchQueryToM3Query(t *testing.T) { func TestFetchOptionsToAggregateOptions(t *testing.T) { fetchOptions := &FetchOptions{ - SeriesLimit: 7, + SeriesLimit: 7, + DocsLimit: 8, + RequireExhaustive: true, } end := time.Now() @@ -281,4 +283,7 @@ func TestFetchOptionsToAggregateOptions(t *testing.T) { assert.Equal(t, index.AggregateTagNames, aggOpts.Type) require.Equal(t, 1, len(aggOpts.FieldFilter)) require.Equal(t, "filter", string(aggOpts.FieldFilter[0])) + require.Equal(t, fetchOptions.SeriesLimit, aggOpts.SeriesLimit) + require.Equal(t, fetchOptions.DocsLimit, aggOpts.DocsLimit) + require.Equal(t, fetchOptions.RequireExhaustive, aggOpts.RequireExhaustive) } From 829e6b329c5dc67edcbff010d6fe76744d52c22a Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 14:26:17 -0500 Subject: [PATCH 08/16] Avoid flakiness with high limits --- scripts/docker-integration-tests/prometheus/test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 343f9df1e9..ad98338ceb 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -404,7 +404,7 @@ function test_label_query_limits_applied { echo "Test label series limit with require-exhaustive headers true (below limit therefore no error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 1000000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" # Test that require exhaustive error is returned @@ -426,7 +426,7 @@ function test_label_query_limits_applied { echo "Test label docs limit with require-exhaustive headers true (below limit therefore no error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1000000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)" # Test that require exhaustive error is returned From 31f001bc1b6f8b1db6762080dd10652d29dd34dd Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 14:42:34 -0500 Subject: [PATCH 09/16] Limit on docs or inserts --- src/dbnode/storage/index/aggregate_results.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index f9bc5f12b7..06bab4dd65 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -185,10 +185,11 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) valuesMap := aggValues.Map() for _, t := range entry.Terms { + docs++ if !valuesMap.Contains(t) { // we can avoid the copy because we assume ownership of the passed ident.ID, // but still need to finalize it. - if maxInserts > valueInsertions { + if maxInserts > valueInsertions || maxDocs > docs { valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: false, From 172100f03cea23c25a7f70435ea7c0c8dff72d5f Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 14:49:57 -0500 Subject: [PATCH 10/16] Fixup integration test --- scripts/docker-integration-tests/prometheus/test.sh | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index ad98338ceb..921da6f27e 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -392,6 +392,11 @@ function test_series { } function test_label_query_limits_applied { + # Test that require exhaustive does nothing if limits are not hit + echo "Test label limits with require-exhaustive headers true (below limit therefore no error)" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' + # Test the default series limit applied when directly querying # coordinator (series limit set by header) echo "Test label series limit with coordinator limit header" @@ -402,10 +407,6 @@ function test_label_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' - echo "Test label series limit with require-exhaustive headers true (below limit therefore no error)" - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 1000000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' - echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" # Test that require exhaustive error is returned ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ @@ -424,10 +425,6 @@ function test_label_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' - echo "Test label docs limit with require-exhaustive headers true (below limit therefore no error)" - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1000000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' - echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)" # Test that require exhaustive error is returned ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ From 84d5a8622d53d64c842eb9173c2023cc089ed9b9 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 15:25:34 -0500 Subject: [PATCH 11/16] Add more precise assertions to label query limits integration test --- scripts/docker-integration-tests/prometheus/test.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 921da6f27e..3b2564d09a 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -399,13 +399,13 @@ function test_label_query_limits_applied { # Test the default series limit applied when directly querying # coordinator (series limit set by header) - echo "Test label series limit with coordinator limit header" + echo "Test label series limit with coordinator limit header (default requires exhaustive so error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + '[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' echo "Test label series limit with require-exhaustive headers false" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + '[[ $(curl -s -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" # Test that require exhaustive error is returned @@ -417,13 +417,13 @@ function test_label_query_limits_applied { # Test the default docs limit applied when directly querying # coordinator (docs limit set by header) - echo "Test label docs limit with coordinator limit header" + echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' echo "Test label docs limit with require-exhaustive headers false" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -lt 3 ]]' + '[[ $(curl -s -H "M3-Limit-Max-Docs: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)" # Test that require exhaustive error is returned From dd5ae2ece6d1942396320b3cdf645784a8d37066 Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 17:07:08 -0500 Subject: [PATCH 12/16] Finish test fixes and refactor --- src/dbnode/storage/index/aggregate_results.go | 52 +- .../storage/index/aggregate_results_test.go | 577 ++++++------------ src/dbnode/storage/index/block_test.go | 2 +- 3 files changed, 194 insertions(+), 437 deletions(-) diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 06bab4dd65..2f4ee5044a 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -29,12 +29,6 @@ import ( "github.com/m3db/m3/src/x/pool" ) -const missingDocumentFields = "invalid document fields: empty %s" - -// NB: emptyValues is an AggregateValues with no values, used for tracking -// terms only rather than terms and values. -var emptyValues = AggregateValues{hasValues: false} - type aggregatedResults struct { sync.RWMutex @@ -133,6 +127,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) return r.size, r.totalDocsCount } + // NB: cannot insert more than max docs, so that acts as the upper bound here. maxInserts := maxDocs if r.aggregateOpts.SizeLimit != 0 { if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < maxInserts { @@ -142,9 +137,9 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) limitTripped := false docs := 0 - valueInsertions := 0 + numInserts := 0 for _, entry := range batch { - if docs >= maxDocs || valueInsertions >= maxInserts { + if docs >= maxDocs || numInserts >= maxInserts { limitTripped = true } @@ -154,7 +149,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) term.Finalize() } - r.size = r.size + valueInsertions + r.size = r.size + numInserts r.totalDocsCount = r.totalDocsCount + docs return r.size, r.totalDocsCount } @@ -163,8 +158,8 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) f := entry.Field aggValues, ok := r.resultsMap.Get(f) if !ok { - if maxInserts > valueInsertions { - valueInsertions++ + if maxInserts > numInserts { + numInserts++ aggValues = r.valuesPool.Get() // we can avoid the copy because we assume ownership of the passed ident.ID, // but still need to finalize it. @@ -185,30 +180,27 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) valuesMap := aggValues.Map() for _, t := range entry.Terms { - docs++ - if !valuesMap.Contains(t) { - // we can avoid the copy because we assume ownership of the passed ident.ID, - // but still need to finalize it. - if maxInserts > valueInsertions || maxDocs > docs { - valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) - valueInsertions++ - } else { - // this value exceeds the limit, so should be released to the underling - // pool without adding to the map. - t.Finalize() + if maxDocs > docs { + docs++ + if !valuesMap.Contains(t) { + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + if maxInserts > numInserts { + valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + numInserts++ + continue + } } - } else { - // because we already have a entry for this term, we release the ident back to - // the underlying pool. - t.Finalize() } + + t.Finalize() } } - r.size = r.size + valueInsertions + r.size = r.size + numInserts r.totalDocsCount = r.totalDocsCount + docs return r.size, r.totalDocsCount } diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index 7ca2954598..501469c0f6 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -21,444 +21,211 @@ package index import ( - "bytes" + "sort" "testing" - "github.com/m3db/m3/src/m3ninx/doc" - "github.com/m3db/m3/src/x/ident" - xtest "github.com/m3db/m3/src/x/test" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/x/ident" + xtest "github.com/m3db/m3/src/x/test" ) -func genDoc(strs ...string) doc.Document { - if len(strs)%2 != 0 { - panic("invalid test setup; need even str length") - } +func entries(entries ...AggregateResultsEntry) []AggregateResultsEntry { return entries } - fields := make([]doc.Field, len(strs)/2) - for i := range fields { - fields[i] = doc.Field{ - Name: []byte(strs[i*2]), - Value: []byte(strs[i*2+1]), - } +func genResultsEntry(field string, terms ...string) AggregateResultsEntry { + entryTerms := make([]ident.ID, 0, len(terms)) + for _, term := range terms { + entryTerms = append(entryTerms, ident.StringID(term)) } - return doc.NewDocumentFromMetadata(doc.Metadata{Fields: fields}) -} - -func TestAggResultsInsertInvalid(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - assert.True(t, res.EnforceLimits()) - - dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) - size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsInsertEmptyTermValue(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - dValidEmptyTerm := genDoc("foo", "") - size, docsCount, err := res.AddDocuments([]doc.Document{dValidEmptyTerm}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) -} - -func TestAggResultsInsertBatchOfTwo(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("d1", "") - d2 := genDoc("d2", "") - size, docsCount, err := res.AddDocuments([]doc.Document{d1, d2}) - require.NoError(t, err) - require.Equal(t, 2, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 2, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsTermOnlyInsert(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) - size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 0, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) - - valid := genDoc("foo", "") - size, docsCount, err = res.AddDocuments([]doc.Document{valid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 3, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 3, res.TotalDocsCount()) -} - -func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { - dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 1, res.TotalDocsCount()) - - size, docsCount, err = res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - require.Equal(t, 1, res.Size()) - require.Equal(t, 2, res.TotalDocsCount()) -} - -func TestAggResultsInsertIdempotency(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - testAggResultsInsertIdempotency(t, res) -} - -func TestAggResultsTermOnlyInsertIdempotency(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - testAggResultsInsertIdempotency(t, res) -} - -func TestInvalidAggregateType(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: 100, - }, testOpts) - dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.Error(t, err) - require.Equal(t, 0, size) - require.Equal(t, 1, docsCount) -} - -func TestAggResultsSameName(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - rMap := res.Map() - aggVals, ok := rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.Equal(t, 1, aggVals.Size()) - assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) - - d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Document{d2}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - aggVals, ok = rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.Equal(t, 2, aggVals.Size()) - assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) - assert.True(t, aggVals.Map().Contains(ident.StringID("biz"))) -} - -func assertNoValuesInNameOnlyAggregate(t *testing.T, v AggregateValues) { - assert.False(t, v.hasValues) - assert.Nil(t, v.valuesMap) - assert.Nil(t, v.pool) - - assert.Equal(t, 0, v.Size()) - assert.Nil(t, v.Map()) - assert.False(t, v.HasValues()) -} - -func TestAggResultsTermOnlySameName(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - rMap := res.Map() - aggVals, ok := rMap.Get(ident.StringID("foo")) - require.True(t, ok) - assertNoValuesInNameOnlyAggregate(t, aggVals) - - d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Document{d2}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 2, docsCount) - - aggVals, ok = rMap.Get(ident.StringID("foo")) - require.True(t, ok) - require.False(t, aggVals.hasValues) - assertNoValuesInNameOnlyAggregate(t, aggVals) + return AggregateResultsEntry{ + Field: ident.StringID(field), + Terms: entryTerms, + } } -func addMultipleDocuments(t *testing.T, res AggregateResults) (int, int) { - _, _, err := res.AddDocuments([]doc.Document{ - genDoc("foo", "bar"), - genDoc("fizz", "bar"), - genDoc("buzz", "bar"), - }) - require.NoError(t, err) - - _, _, err = res.AddDocuments([]doc.Document{ - genDoc("foo", "biz"), - genDoc("fizz", "bar"), - }) - require.NoError(t, err) - - size, docsCount, err := res.AddDocuments([]doc.Document{ - genDoc("foo", "baz", "buzz", "bag", "qux", "qaz"), - }) - - require.NoError(t, err) - return size, docsCount -} +func toMap(res AggregateResults) map[string][]string { + entries := res.Map().Iter() + resultMap := make(map[string][]string, len(entries)) + for _, entry := range entries { + terms := entry.value.Map().Iter() + resultTerms := make([]string, 0, len(terms)) + for _, term := range terms { + resultTerms = append(resultTerms, term.Key().String()) + } -func expectedTermsOnly(ex map[string][]string) map[string][]string { - m := make(map[string][]string, len(ex)) - for k := range ex { - m[k] = []string{} + sort.Strings(resultTerms) + resultMap[entry.Key().String()] = resultTerms } - return m + return resultMap } -func toFilter(strs ...string) AggregateFieldFilter { - b := make([][]byte, len(strs)) - for i, s := range strs { - b[i] = []byte(s) +func TestAggResultsInsertWithRepeatedFields(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) + entries := entries(genResultsEntry("foo", "baz", "baz", "baz", "qux")) + size, docsCount := res.AddFields(entries) + require.Equal(t, 3, size) + require.Equal(t, 5, docsCount) + require.Equal(t, 3, res.Size()) + require.Equal(t, 5, res.TotalDocsCount()) + + expected := map[string][]string{ + "foo": {"baz", "qux"}, } - return AggregateFieldFilter(b) + assert.Equal(t, expected, toMap(res)) } -var mergeTests = []struct { - name string - opts AggregateResultsOptions - expected map[string][]string -}{ - { - name: "no limit no filter", - opts: AggregateResultsOptions{}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, - "qux": {"qaz"}, +func TestWithLimits(t *testing.T) { + tests := []struct { + name string + entries []AggregateResultsEntry + sizeLimit int + docLimit int + exSeries int + exDocs int + expected map[string][]string + }{ + { + name: "single term", + entries: entries(genResultsEntry("foo")), + exSeries: 1, + exDocs: 1, + expected: map[string][]string{"foo": {}}, }, - }, - { - name: "with limit no filter", - opts: AggregateResultsOptions{SizeLimit: 2}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, + { + name: "same term", + entries: entries(genResultsEntry("foo"), genResultsEntry("foo")), + exSeries: 1, + exDocs: 2, + expected: map[string][]string{"foo": {}}, }, - }, - { - name: "no limit empty filter", - opts: AggregateResultsOptions{FieldFilter: toFilter()}, - expected: map[string][]string{ - "foo": {"bar", "biz", "baz"}, - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, - "qux": {"qaz"}, + { + name: "multiple terms", + entries: entries(genResultsEntry("foo"), genResultsEntry("bar")), + exSeries: 2, + exDocs: 2, + expected: map[string][]string{"foo": {}, "bar": {}}, }, - }, - { - name: "no limit matchless filter", - opts: AggregateResultsOptions{FieldFilter: toFilter("zig")}, - expected: map[string][]string{}, - }, - { - name: "empty limit with filter", - opts: AggregateResultsOptions{FieldFilter: toFilter("buzz")}, - expected: map[string][]string{ - "buzz": {"bar", "bag"}, + { + name: "single entry", + entries: entries(genResultsEntry("foo", "bar")), + exSeries: 2, + exDocs: 2, + expected: map[string][]string{"foo": {"bar"}}, }, - }, - { - name: "with limit with filter", - opts: AggregateResultsOptions{ - SizeLimit: 2, FieldFilter: toFilter("buzz", "qux", "fizz")}, - expected: map[string][]string{ - "fizz": {"bar"}, - "buzz": {"bar", "bag"}, + { + name: "single entry multiple fields", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "baz", "qux")), + exSeries: 4, + exDocs: 6, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + }, + { + name: "multiple entry multiple fields", + entries: entries( + genResultsEntry("foo", "bar", "baz"), + genResultsEntry("foo", "baz", "baz", "qux")), + exSeries: 4, + exDocs: 7, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + }, + { + name: "multiple entries", + entries: entries(genResultsEntry("foo", "baz"), genResultsEntry("bar", "baz", "qux")), + exSeries: 5, + exDocs: 5, + expected: map[string][]string{"foo": {"baz"}, "bar": {"baz", "qux"}}, }, - }, -} -func TestAggResultsMerge(t *testing.T) { - for _, tt := range mergeTests { - t.Run(tt.name, func(t *testing.T) { - res := NewAggregateResults(nil, tt.opts, testOpts) - size, docsCount := addMultipleDocuments(t, res) - - require.Equal(t, len(tt.expected), size) - require.Equal(t, 6, docsCount) - ac := res.Map() - require.Equal(t, len(tt.expected), ac.Len()) - for k, v := range tt.expected { - aggVals, ok := ac.Get(ident.StringID(k)) - require.True(t, ok) - require.Equal(t, len(v), aggVals.Size()) - for _, actual := range v { - require.True(t, aggVals.Map().Contains(ident.StringID(actual))) - } - } - }) - } -} + { + name: "single entry query at size limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), + sizeLimit: 4, + exSeries: 4, + exDocs: 5, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + }, + { + name: "single entry query at doc limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), + docLimit: 5, + exSeries: 4, + exDocs: 5, + expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + }, -func TestAggResultsMergeNameOnly(t *testing.T) { - for _, tt := range mergeTests { - t.Run(tt.name+" name only", func(t *testing.T) { - tt.opts.Type = AggregateTagNames - res := NewAggregateResults(nil, tt.opts, testOpts) - size, docsCount := addMultipleDocuments(t, res) - - require.Equal(t, len(tt.expected), size) - require.Equal(t, 6, docsCount) - - ac := res.Map() - require.Equal(t, len(tt.expected), ac.Len()) - for k := range tt.expected { - aggVals, ok := ac.Get(ident.StringID(k)) - require.True(t, ok) - assertNoValuesInNameOnlyAggregate(t, aggVals) - } - }) - } -} + { + name: "single entry query below size limit", + entries: entries(genResultsEntry("foo", "bar", "baz", "qux")), + sizeLimit: 3, + exSeries: 3, + exDocs: 4, + expected: map[string][]string{"foo": {"bar", "baz"}}, + }, + { + name: "single entry query below doc limit", + entries: entries(genResultsEntry("foo", "bar", "bar", "bar", "baz")), + docLimit: 3, + exSeries: 2, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}}, + }, -func TestAggResultsInsertCopies(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - dValid := genDoc("foo", "bar") + { + name: "multiple entry query below series limit", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + sizeLimit: 3, + exSeries: 3, + exDocs: 4, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + }, + { + name: "multiple entry query below doc limit", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + docLimit: 3, + exSeries: 3, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + }, - d, ok := dValid.Metadata() - require.True(t, ok) - name := d.Fields[0].Name - value := d.Fields[0].Value - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - found := false - - // our genny generated maps don't provide access to MapEntry directly, - // so we iterate over the map to find the added entry. Could avoid this - // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` - for _, entry := range res.Map().Iter() { - // see if this key has the same value as the added document's ID. - n := entry.Key().Bytes() - if !bytes.Equal(name, n) { - continue - } - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) - v := entry.Value() - for _, f := range v.Map().Iter() { - v := f.Key().Bytes() - if !bytes.Equal(value, v) { - continue - } - - found = true - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(v, value)) - } + { + name: "multiple entry query both limits", + entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), + docLimit: 3, + sizeLimit: 10, + exSeries: 3, + exDocs: 3, + expected: map[string][]string{"foo": {"bar"}, "baz": {}}, + }, } - require.True(t, found) -} - -func TestAggResultsNameOnlyInsertCopies(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - Type: AggregateTagNames, - }, testOpts) - dValid := genDoc("foo", "bar") - d, ok := dValid.Metadata() - require.True(t, ok) - name := d.Fields[0].Name - size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) - - found := false - // our genny generated maps don't provide access to MapEntry directly, - // so we iterate over the map to find the added entry. Could avoid this - // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` - for _, entry := range res.Map().Iter() { - // see if this key has the same value as the added document's ID. - n := entry.Key().Bytes() - if !bytes.Equal(name, n) { - continue - } - - // ensure the underlying []byte for ID/Fields is at a different address - // than the original. - require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) - found = true - assertNoValuesInNameOnlyAggregate(t, entry.Value()) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{ + SizeLimit: tt.sizeLimit, + DocsLimit: tt.docLimit, + }, testOpts) + + size, docsCount := res.AddFields(tt.entries) + assert.Equal(t, tt.exSeries, size) + assert.Equal(t, tt.exDocs, docsCount) + assert.Equal(t, tt.exSeries, res.Size()) + assert.Equal(t, tt.exDocs, res.TotalDocsCount()) + + assert.Equal(t, tt.expected, toMap(res)) + }) } - - require.True(t, found) } func TestAggResultsReset(t *testing.T) { res := NewAggregateResults(ident.StringID("qux"), AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) + size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) + require.Equal(t, 2, size) + require.Equal(t, 2, docsCount) aggVals, ok := res.Map().Get(ident.StringID("foo")) require.True(t, ok) @@ -504,11 +271,9 @@ func TestAggResultsResetNamespaceClones(t *testing.T) { func TestAggResultFinalize(t *testing.T) { // Create a Results and insert some data. res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Document{d1}) - require.NoError(t, err) - require.Equal(t, 1, size) - require.Equal(t, 1, docsCount) + size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) + require.Equal(t, 2, size) + require.Equal(t, 2, docsCount) // Ensure the data is present. rMap := res.Map() diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 1b057254cb..a45687da68 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -2001,7 +2001,7 @@ func TestBlockAggregateNotExhaustive(t *testing.T) { require.False(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ - "f1": {"t1"}, + "f1": {}, }, results) sp.Finish() From b18484e4d6e6e4b2312c068118d92855e154473b Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 17:25:43 -0500 Subject: [PATCH 13/16] Response + lint --- src/dbnode/storage/index.go | 2 +- src/dbnode/storage/index/aggregate_results.go | 14 +++---- .../aggregate_results_entry_arraypool_gen.go | 14 ------- .../storage/index/aggregate_results_test.go | 2 +- src/dbnode/storage/index/block.go | 39 ++++++++++++------- 5 files changed, 33 insertions(+), 38 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index be6b89bc9c..581d39b03f 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1691,7 +1691,7 @@ func (i *nsIndex) execBlockQueryFn( docResults, ok := results.(index.DocumentResults) if !ok { // should never happen state.Lock() - err := fmt.Errorf("unknown results type [%T] received during wide query", results) + err := fmt.Errorf("unknown results type [%T] received during query", results) state.multiErr = state.multiErr.Add(err) state.Unlock() return diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 2f4ee5044a..2d7a3100b3 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -117,9 +117,9 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) // NB: already hit doc limit. if maxDocs <= 0 { - for _, entry := range batch { - entry.Field.Finalize() - for _, term := range entry.Terms { + for idx := 0; idx < len(batch); idx++ { + batch[idx].Field.Finalize() + for _, term := range batch[idx].Terms { term.Finalize() } } @@ -149,8 +149,8 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) term.Finalize() } - r.size = r.size + numInserts - r.totalDocsCount = r.totalDocsCount + docs + r.size += numInserts + r.totalDocsCount += docs return r.size, r.totalDocsCount } @@ -200,8 +200,8 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } } - r.size = r.size + numInserts - r.totalDocsCount = r.totalDocsCount + docs + r.size += numInserts + r.totalDocsCount += docs return r.size, r.totalDocsCount } diff --git a/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go b/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go index 1e8f98e59a..66ac84180c 100644 --- a/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go +++ b/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go @@ -48,20 +48,6 @@ import ( // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// AggregateResultsEntries is a slice of AggregateResultsEntry. -type AggregateResultsEntries []AggregateResultsEntry - -// Size is the element size of the aggregated result entries. -func (e AggregateResultsEntries) Size() int { - // NB: add 1 to the entries length for each entry's field. - length := len(e) - for _, entry := range e { - length += len(entry.Terms) - } - - return length -} - // AggregateResultsEntryArrayPool provides a pool for aggregateResultsEntry slices. type AggregateResultsEntryArrayPool interface { // Init initializes the array pool, it needs to be called diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index 501469c0f6..bf6368ead4 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -48,7 +48,7 @@ func genResultsEntry(field string, terms ...string) AggregateResultsEntry { func toMap(res AggregateResults) map[string][]string { entries := res.Map().Iter() resultMap := make(map[string][]string, len(entries)) - for _, entry := range entries { + for _, entry := range entries { //nolint:gocritic terms := entry.value.Map().Iter() resultTerms := make([]string, 0, len(terms)) for _, term := range terms { diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index e692f5ccc5..d8355e2494 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -641,12 +641,14 @@ func (b *block) aggregateWithSpan( } var ( - source = opts.Source - size = results.Size() - resultCount = results.TotalDocsCount() - batch = AggregateResultsEntries(b.opts.AggregateResultsEntryArrayPool().Get()) - maxBatch = cap(batch) - iterClosed = false // tracking whether we need to free the iterator at the end. + source = opts.Source + size = results.Size() + resultCount = results.TotalDocsCount() + batch = b.opts.AggregateResultsEntryArrayPool().Get() + numAdded = 0 + currBatchSize = 0 + maxBatch = cap(batch) + iterClosed = false // tracking whether we need to free the iterator at the end. ) if maxBatch == 0 { maxBatch = defaultAggregateResultsEntryBatchSize @@ -699,15 +701,18 @@ func (b *block) aggregateWithSpan( } field, term := iter.Current() - batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) - if batch.Size() < maxBatch { + batch, numAdded = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) + currBatchSize += numAdded + if currBatchSize < maxBatch { continue } - batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) + batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source, currBatchSize) if err != nil { return false, err } + + currBatchSize = 0 } if err := iter.Err(); err != nil { @@ -722,7 +727,7 @@ func (b *block) aggregateWithSpan( // Add last batch to results if remaining. for len(batch) > 0 { - batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source) + batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source, currBatchSize) if err != nil { return false, err } @@ -735,7 +740,7 @@ func (b *block) appendFieldAndTermToBatch( batch []AggregateResultsEntry, field, term []byte, includeTerms bool, -) []AggregateResultsEntry { +) ([]AggregateResultsEntry, int) { // NB(prateek): we make a copy of the (field, term) entries returned // by the iterator during traversal, because the []byte are only valid per entry during // the traversal (i.e. calling Next() invalidates the []byte). We choose to do this @@ -748,6 +753,7 @@ func (b *block) appendFieldAndTermToBatch( lastField []byte lastFieldIsValid bool reuseLastEntry bool + numAppended int ) // we are iterating multiple segments so we may receive duplicates (same field/term), but // as we are iterating one segment at a time, and because the underlying index structures @@ -770,6 +776,7 @@ func (b *block) appendFieldAndTermToBatch( reuseLastEntry = true entry = batch[len(batch)-1] // avoid alloc cause we already have the field } else { + numAppended++ // allocate id because this is the first time we've seen it // NB(r): Iterating fields FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -778,6 +785,7 @@ func (b *block) appendFieldAndTermToBatch( } if includeTerms { + numAppended++ // terms are always new (as far we know without checking the map for duplicates), so we allocate // NB(r): Iterating terms FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -791,7 +799,7 @@ func (b *block) appendFieldAndTermToBatch( batch = append(batch, entry) } - return batch + return batch, numAppended } func (b *block) pooledID(id []byte) ident.ID { @@ -805,12 +813,13 @@ func (b *block) pooledID(id []byte) ident.ID { func (b *block) addAggregateResults( cancellable *xresource.CancellableLifetime, results AggregateResults, - batch AggregateResultsEntries, + batch []AggregateResultsEntry, source []byte, -) (AggregateResultsEntries, int, int, error) { + currBatchSize int, +) ([]AggregateResultsEntry, int, int, error) { // update recently queried docs to monitor memory. if results.EnforceLimits() { - if err := b.docsLimit.Inc(batch.Size(), source); err != nil { + if err := b.docsLimit.Inc(currBatchSize, source); err != nil { return batch, 0, 0, err } } From ab538cda529b14422ac4618ee53c8debad605420 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 17:58:30 -0500 Subject: [PATCH 14/16] Improve IT comments --- scripts/docker-integration-tests/prometheus/test.sh | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 3b2564d09a..de4b9ebdef 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -233,7 +233,7 @@ function test_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 3" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/query?query=database_write_tagged_success) = "400" ]]' - # Test the default docs limit applied when directly querying + # Test the docs limit applied when directly querying # coordinator (docs limit set by header) echo "Test query docs limit with coordinator limit header" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ @@ -397,8 +397,7 @@ function test_label_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' - # Test the default series limit applied when directly querying - # coordinator (series limit set by header) + # the header takes precedence over the configured default series limit echo "Test label series limit with coordinator limit header (default requires exhaustive so error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' @@ -415,8 +414,6 @@ function test_label_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' - # Test the default docs limit applied when directly querying - # coordinator (docs limit set by header) echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' From b3fdbba2e211e661a2294db16a07a97ef4ce30b2 Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 18:32:03 -0500 Subject: [PATCH 15/16] Response + lint --- src/dbnode/storage/index/aggregate_results.go | 25 ++++++++----------- src/dbnode/storage/index/block.go | 4 +-- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 2d7a3100b3..673aff2847 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -110,13 +110,13 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) r.Lock() defer r.Unlock() - maxDocs := int(math.MaxInt64) + remainingDocs := int(math.MaxInt64) if r.aggregateOpts.DocsLimit != 0 { - maxDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount + remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount } // NB: already hit doc limit. - if maxDocs <= 0 { + if remainingDocs <= 0 { for idx := 0; idx < len(batch); idx++ { batch[idx].Field.Finalize() for _, term := range batch[idx].Terms { @@ -128,22 +128,17 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } // NB: cannot insert more than max docs, so that acts as the upper bound here. - maxInserts := maxDocs + remainingInserts := remainingDocs if r.aggregateOpts.SizeLimit != 0 { - if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < maxInserts { - maxInserts = remaining + if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < remainingInserts { + remainingInserts = remaining } } - limitTripped := false docs := 0 numInserts := 0 for _, entry := range batch { - if docs >= maxDocs || numInserts >= maxInserts { - limitTripped = true - } - - if limitTripped { + if docs >= remainingDocs || numInserts >= remainingInserts { entry.Field.Finalize() for _, term := range entry.Terms { term.Finalize() @@ -158,7 +153,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) f := entry.Field aggValues, ok := r.resultsMap.Get(f) if !ok { - if maxInserts > numInserts { + if remainingInserts > numInserts { numInserts++ aggValues = r.valuesPool.Get() // we can avoid the copy because we assume ownership of the passed ident.ID, @@ -180,12 +175,12 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) valuesMap := aggValues.Map() for _, t := range entry.Terms { - if maxDocs > docs { + if remainingDocs > docs { docs++ if !valuesMap.Contains(t) { // we can avoid the copy because we assume ownership of the passed ident.ID, // but still need to finalize it. - if maxInserts > numInserts { + if remainingInserts > numInserts { valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: false, diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index d8355e2494..6a74b74bc4 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -645,10 +645,10 @@ func (b *block) aggregateWithSpan( size = results.Size() resultCount = results.TotalDocsCount() batch = b.opts.AggregateResultsEntryArrayPool().Get() - numAdded = 0 - currBatchSize = 0 maxBatch = cap(batch) iterClosed = false // tracking whether we need to free the iterator at the end. + currBatchSize int + numAdded int ) if maxBatch == 0 { maxBatch = defaultAggregateResultsEntryBatchSize From 4221a998a481274e5fabf9e78da2535be5d47cec Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 22 Jan 2021 18:43:56 -0500 Subject: [PATCH 16/16] Fix integrations --- scripts/docker-integration-tests/prometheus/test.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 3b2564d09a..c3dbfdcc75 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -405,15 +405,15 @@ function test_label_query_limits_applied { echo "Test label series limit with require-exhaustive headers false" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' + '[[ $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" # Test that require exhaustive error is returned ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' + '[[ -n $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' # Test that require exhaustive error is 4xx ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' + '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' # Test the default docs limit applied when directly querying # coordinator (docs limit set by header)