From c15809b24437aae37e217dbc6c0081f2ec4f46c3 Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Sat, 11 May 2019 18:01:20 -0400 Subject: [PATCH 1/2] [dbnode] Optimize index.Aggregate() for FieldQuery --- .../peers_bootstrap_index_aggregate_test.go | 4 +- .../server/tchannelthrift/convert/convert.go | 12 +- .../tchannelthrift/convert/convert_test.go | 2 +- .../tchannelthrift/node/service_test.go | 2 +- src/dbnode/storage/index.go | 18 +- src/dbnode/storage/index/aggregate_results.go | 4 +- .../storage/index/aggregate_results_test.go | 12 +- .../storage/index/aggregated_term_filter.go | 2 +- src/dbnode/storage/index/block.go | 23 ++- src/dbnode/storage/index/block_test.go | 24 ++- .../storage/index/fields_terms_iterator.go | 12 +- .../storage/index/filter_fields_iterator.go | 86 +++++++++ .../index/filter_fields_iterator_test.go | 166 ++++++++++++++++++ .../storage/index/read_through_segment.go | 6 + src/dbnode/storage/index/types.go | 16 +- src/dbnode/storage/index_block_test.go | 56 +++--- src/m3ninx/idx/query.go | 10 ++ src/m3ninx/idx/query_test.go | 64 +++++++ src/m3ninx/index/segment/fst/fst_mock.go | 15 ++ src/m3ninx/index/segment/fst/segment.go | 9 + .../index/segment/fst/writer_reader_test.go | 21 +++ src/m3ninx/index/segment/mem/segment.go | 12 ++ src/m3ninx/index/segment/mem/segment_test.go | 44 +++++ src/m3ninx/index/segment/mem/terms_dict.go | 7 + .../index/segment/mem/terms_dict_test.go | 21 +++ src/m3ninx/index/segment/mem/types.go | 4 + src/m3ninx/index/segment/segment_mock.go | 30 ++++ src/m3ninx/index/segment/types.go | 3 + src/m3ninx/search/query/field.go | 5 + src/query/storage/index.go | 2 +- src/query/storage/index_test.go | 4 +- 31 files changed, 630 insertions(+), 66 deletions(-) create mode 100644 src/dbnode/storage/index/filter_fields_iterator.go create mode 100644 src/dbnode/storage/index/filter_fields_iterator_test.go create mode 100644 src/m3ninx/idx/query_test.go diff --git a/src/dbnode/integration/peers_bootstrap_index_aggregate_test.go b/src/dbnode/integration/peers_bootstrap_index_aggregate_test.go index da0dc6fe87..fa82db0c8a 100644 --- a/src/dbnode/integration/peers_bootstrap_index_aggregate_test.go +++ b/src/dbnode/integration/peers_bootstrap_index_aggregate_test.go @@ -192,7 +192,7 @@ func TestPeersBootstrapIndexAggregateQuery(t *testing.T) { // Now test term filtering, match all new_*r*, filtering on `foo` regexpQuery, err = idx.NewRegexpQuery([]byte("city"), []byte("new_.*r.*")) require.NoError(t, err) - queryOpts.TermFilter = index.AggregateTermFilter([][]byte{[]byte("foo")}) + queryOpts.FieldFilter = index.AggregateFieldFilter([][]byte{[]byte("foo")}) iter, exhaustive, err = session.Aggregate(ns1.ID(), index.Query{regexpQuery}, queryOpts) require.NoError(t, err) @@ -212,7 +212,7 @@ func TestPeersBootstrapIndexAggregateQuery(t *testing.T) { // Now test term filter and tag name filtering, match all new_*r*, names only, filtering on `city` regexpQuery, err = idx.NewRegexpQuery([]byte("city"), []byte("new_.*r.*")) require.NoError(t, err) - queryOpts.TermFilter = index.AggregateTermFilter([][]byte{[]byte("city")}) + queryOpts.FieldFilter = index.AggregateFieldFilter([][]byte{[]byte("city")}) queryOpts.Type = index.AggregateTagNames iter, exhaustive, err = session.Aggregate(ns1.ID(), index.Query{regexpQuery}, queryOpts) diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert.go b/src/dbnode/network/server/tchannelthrift/convert/convert.go index b828f19df6..aa658d06ad 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert.go @@ -306,9 +306,9 @@ func FromRPCAggregateQueryRequest( return nil, index.Query{}, index.AggregationOptions{}, err } - opts.TermFilter = make(index.AggregateTermFilter, 0, len(req.TagNameFilter)) + opts.FieldFilter = make(index.AggregateFieldFilter, 0, len(req.TagNameFilter)) for _, f := range req.TagNameFilter { - opts.TermFilter = append(opts.TermFilter, []byte(f)) + opts.FieldFilter = append(opts.FieldFilter, []byte(f)) } if req.AggregateQueryType == rpc.AggregateQueryType_AGGREGATE_BY_TAG_NAME_VALUE { @@ -351,7 +351,7 @@ func FromRPCAggregateQueryRawRequest( return nil, index.Query{}, index.AggregationOptions{}, err } - opts.TermFilter = index.AggregateTermFilter(req.TagNameFilter) + opts.FieldFilter = index.AggregateFieldFilter(req.TagNameFilter) if req.AggregateQueryType == rpc.AggregateQueryType_AGGREGATE_BY_TAG_NAME_VALUE { opts.Type = index.AggregateTagNamesAndValues } else { @@ -407,9 +407,9 @@ func ToRPCAggregateQueryRawRequest( request.AggregateQueryType = rpc.AggregateQueryType_AGGREGATE_BY_TAG_NAME } - // TODO(prateek): pool the []byte underlying opts.TermFilter - filters := make([][]byte, 0, len(opts.TermFilter)) - for _, f := range opts.TermFilter { + // TODO(prateek): pool the []byte underlying opts.FieldFilter + filters := make([][]byte, 0, len(opts.FieldFilter)) + for _, f := range opts.FieldFilter { copied := append([]byte(nil), f...) filters = append(filters, copied) } diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert_test.go b/src/dbnode/network/server/tchannelthrift/convert/convert_test.go index 31a92d47b0..2a0a50f002 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert_test.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert_test.go @@ -174,7 +174,7 @@ func TestConvertAggregateRawQueryRequest(t *testing.T) { Limit: 10, }, Type: index.AggregateTagNamesAndValues, - TermFilter: index.AggregateTermFilter{ + FieldFilter: index.AggregateFieldFilter{ []byte("some"), []byte("string"), }, diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 6f745cba97..da638d0eea 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1661,7 +1661,7 @@ func TestServiceAggregate(t *testing.T) { EndExclusive: end, Limit: 10, }, - TermFilter: index.AggregateTermFilter{ + FieldFilter: index.AggregateFieldFilter{ []byte("foo"), []byte("bar"), }, Type: index.AggregateTagNamesAndValues, diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 528d2beca6..ee60fc0ef8 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -909,17 +909,25 @@ func (i *nsIndex) AggregateQuery( ) (index.AggregateQueryResult, error) { // Get results and set the filters, namespace ID and size limit. results := i.aggregateResultsPool.Get() - results.Reset(i.nsMetadata.ID(), index.AggregateResultsOptions{ - SizeLimit: opts.Limit, - TermFilter: opts.TermFilter, - Type: opts.Type, - }) + aopts := index.AggregateResultsOptions{ + SizeLimit: opts.Limit, + FieldFilter: opts.FieldFilter, + Type: opts.Type, + } ctx.RegisterFinalizer(results) // use appropriate fn to query underlying blocks. + // default to block.Query() fn := i.execBlockQueryFn + // use block.Aggregate() when possible if query.Equal(allQuery) { fn = i.execBlockAggregateQueryFn } + field, isField := idx.FieldQuery(query.Query) + if isField { + fn = i.execBlockAggregateQueryFn + aopts.FieldFilter = append(index.AggregateFieldFilter{field}, aopts.FieldFilter...) + } + results.Reset(i.nsMetadata.ID(), aopts) exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn) if err != nil { return index.AggregateQueryResult{}, err diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 9ccc7b383b..792ea47c10 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -199,7 +199,7 @@ func (r *aggregatedResults) addTermWithLock( // if a term filter is provided, ensure this field matches the filter, // otherwise ignore it. - filter := r.aggregateOpts.TermFilter + filter := r.aggregateOpts.FieldFilter if filter != nil && !filter.Allow(term) { return nil } @@ -244,7 +244,7 @@ func (r *aggregatedResults) addFieldWithLock( // if a term filter is provided, ensure this field matches the filter, // otherwise ignore it. - filter := r.aggregateOpts.TermFilter + filter := r.aggregateOpts.FieldFilter if filter != nil && !filter.Allow(term) { return nil } diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index d62a785ad0..41b915a0f2 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -213,13 +213,13 @@ func expectedTermsOnly(ex map[string][]string) map[string][]string { return m } -func toFilter(strs ...string) AggregateTermFilter { +func toFilter(strs ...string) AggregateFieldFilter { b := make([][]byte, len(strs)) for i, s := range strs { b[i] = []byte(s) } - return AggregateTermFilter(b) + return AggregateFieldFilter(b) } var mergeTests = []struct { @@ -247,7 +247,7 @@ var mergeTests = []struct { }, { name: "no limit empty filter", - opts: AggregateResultsOptions{TermFilter: toFilter()}, + opts: AggregateResultsOptions{FieldFilter: toFilter()}, expected: map[string][]string{ "foo": []string{"bar", "biz", "baz"}, "fizz": []string{"bar"}, @@ -257,12 +257,12 @@ var mergeTests = []struct { }, { name: "no limit matchless filter", - opts: AggregateResultsOptions{TermFilter: toFilter("zig")}, + opts: AggregateResultsOptions{FieldFilter: toFilter("zig")}, expected: map[string][]string{}, }, { name: "empty limit with filter", - opts: AggregateResultsOptions{TermFilter: toFilter("buzz")}, + opts: AggregateResultsOptions{FieldFilter: toFilter("buzz")}, expected: map[string][]string{ "buzz": []string{"bar", "bag"}, }, @@ -270,7 +270,7 @@ var mergeTests = []struct { { name: "with limit with filter", opts: AggregateResultsOptions{ - SizeLimit: 2, TermFilter: toFilter("buzz", "qux", "fizz")}, + SizeLimit: 2, FieldFilter: toFilter("buzz", "qux", "fizz")}, expected: map[string][]string{ "fizz": []string{"bar"}, "buzz": []string{"bar", "bag"}, diff --git a/src/dbnode/storage/index/aggregated_term_filter.go b/src/dbnode/storage/index/aggregated_term_filter.go index a340296236..5a661c305f 100644 --- a/src/dbnode/storage/index/aggregated_term_filter.go +++ b/src/dbnode/storage/index/aggregated_term_filter.go @@ -23,7 +23,7 @@ package index import "bytes" // Allow returns true if the given term satisfies the filter. -func (f AggregateTermFilter) Allow(term []byte) bool { +func (f AggregateFieldFilter) Allow(term []byte) bool { if len(f) == 0 { // NB: if filter is empty, all values are valid. return true diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index aa04cd9df1..591cae807a 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -24,6 +24,7 @@ import ( "bytes" "errors" "fmt" + "sort" "sync" "time" @@ -894,6 +895,10 @@ func (b *block) Aggregate( } aggOpts := results.AggregateResultsOptions() + // ensure we iterate the provided filter fields in order. + sort.Slice(aggOpts.FieldFilter, func(i, j int) bool { + return bytes.Compare(aggOpts.FieldFilter[i], aggOpts.FieldFilter[j]) < 0 + }) iterateTerms := aggOpts.Type == AggregateTagNamesAndValues iterateOpts := fieldsAndTermsIteratorOpts{ iterateTerms: iterateTerms, @@ -902,7 +907,23 @@ func (b *block) Aggregate( if bytes.Equal(field, doc.IDReservedFieldName) { return false } - return aggOpts.TermFilter.Allow(field) + return aggOpts.FieldFilter.Allow(field) + }, + fieldIterFn: func(s segment.Segment) (segment.FieldsIterator, error) { + // NB(prateek): we default to using the regular (FST) fields iterator + // unless we have a predefined list of fields we know we need to restrict + // our search to, in which case we iterate that list and check if known values + // in the FST to restrict our search. This is going to be significantly faster + // while len(FieldsFilter) < 5-10 elements; + // but there will exist a ratio between the len(FieldFilter) v size(FST) after which + // iterating the entire FST is faster. + // Here, we chose to avoid factoring that in to our choice because almost all input + // to this function is expected to have (FieldsFilter) pretty small. If that changes + // in the future, we can revisit this. + if len(aggOpts.FieldFilter) == 0 { + return s.FieldsIterable().Fields() + } + return newFilterFieldsIterator(s, aggOpts.FieldFilter) }, } diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index c2a0e12d9e..5b91786811 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -1728,15 +1728,35 @@ func TestBlockE2EInsertAggregate(t *testing.T) { SizeLimit: 10, Type: AggregateTagNamesAndValues, }, testOpts) - exhaustive, err := b.Aggregate(resource.NewCancellableLifetime(), QueryOptions{Limit: 10}, results) require.NoError(t, err) require.True(t, exhaustive) - assertAggregateResultsMapEquals(t, map[string][]string{ "bar": []string{"baz", "qux"}, "some": []string{"more", "other"}, }, results) + + results = NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + SizeLimit: 10, + Type: AggregateTagNamesAndValues, + FieldFilter: AggregateFieldFilter{[]byte("bar")}, + }, testOpts) + exhaustive, err = b.Aggregate(resource.NewCancellableLifetime(), QueryOptions{Limit: 10}, results) + require.NoError(t, err) + require.True(t, exhaustive) + assertAggregateResultsMapEquals(t, map[string][]string{ + "bar": []string{"baz", "qux"}, + }, results) + + results = NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + SizeLimit: 10, + Type: AggregateTagNamesAndValues, + FieldFilter: AggregateFieldFilter{[]byte("random")}, + }, testOpts) + exhaustive, err = b.Aggregate(resource.NewCancellableLifetime(), QueryOptions{Limit: 10}, results) + require.NoError(t, err) + require.True(t, exhaustive) + assertAggregateResultsMapEquals(t, map[string][]string{}, results) } func assertAggregateResultsMapEquals(t *testing.T, expected map[string][]string, observed AggregateResults) { diff --git a/src/dbnode/storage/index/fields_terms_iterator.go b/src/dbnode/storage/index/fields_terms_iterator.go index fea0c91388..517e0f2c2b 100644 --- a/src/dbnode/storage/index/fields_terms_iterator.go +++ b/src/dbnode/storage/index/fields_terms_iterator.go @@ -29,6 +29,7 @@ import ( type fieldsAndTermsIteratorOpts struct { iterateTerms bool allowFn allowFn + fieldIterFn newFieldIterFn } func (o fieldsAndTermsIteratorOpts) allow(f []byte) bool { @@ -38,8 +39,17 @@ func (o fieldsAndTermsIteratorOpts) allow(f []byte) bool { return o.allowFn(f) } +func (o fieldsAndTermsIteratorOpts) newFieldIter(s segment.Segment) (segment.FieldsIterator, error) { + if o.fieldIterFn == nil { + return s.FieldsIterable().Fields() + } + return o.fieldIterFn(s) +} + type allowFn func(field []byte) bool +type newFieldIterFn func(s segment.Segment) (segment.FieldsIterator, error) + type fieldsAndTermsIter struct { seg segment.Segment opts fieldsAndTermsIteratorOpts @@ -81,7 +91,7 @@ func (fti *fieldsAndTermsIter) Reset(s segment.Segment, opts fieldsAndTermsItera if s == nil { return nil } - fiter, err := s.FieldsIterable().Fields() + fiter, err := fti.opts.newFieldIter(s) if err != nil { return err } diff --git a/src/dbnode/storage/index/filter_fields_iterator.go b/src/dbnode/storage/index/filter_fields_iterator.go new file mode 100644 index 0000000000..237d3b0810 --- /dev/null +++ b/src/dbnode/storage/index/filter_fields_iterator.go @@ -0,0 +1,86 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "errors" + + "github.com/m3db/m3/src/m3ninx/index/segment" +) + +var ( + errNoFiltersSpecified = errors.New("no fields specified to filter upon") +) + +func newFilterFieldsIterator( + seg segment.Segment, + fields AggregateFieldFilter, +) (segment.FieldsIterator, error) { + if len(fields) == 0 { + return nil, errNoFiltersSpecified + } + return &filterFieldsIterator{ + seg: seg, + fields: fields, + currentIdx: -1, + }, nil +} + +type filterFieldsIterator struct { + seg segment.Segment + fields AggregateFieldFilter + + err error + currentIdx int +} + +var _ segment.FieldsIterator = &filterFieldsIterator{} + +func (f *filterFieldsIterator) Next() bool { + if f.err != nil { + return false + } + + f.currentIdx++ // required because we start at -1 + for f.currentIdx < len(f.fields) { + field := f.fields[f.currentIdx] + + ok, err := f.seg.ContainsField(field) + if err != nil { + f.err = err + return false + } + + // i.e. we found a field from the filter list contained in the segment. + if ok { + return true + } + + // the current field is unsuitable, so we skip to the next possiblity. + f.currentIdx++ + } + + return false +} + +func (f *filterFieldsIterator) Current() []byte { return f.fields[f.currentIdx] } +func (f *filterFieldsIterator) Err() error { return f.err } +func (f *filterFieldsIterator) Close() error { return nil } diff --git a/src/dbnode/storage/index/filter_fields_iterator_test.go b/src/dbnode/storage/index/filter_fields_iterator_test.go new file mode 100644 index 0000000000..62e8f09be9 --- /dev/null +++ b/src/dbnode/storage/index/filter_fields_iterator_test.go @@ -0,0 +1,166 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "testing" + + "github.com/m3db/m3/src/m3ninx/index/segment" + xtest "github.com/m3db/m3/src/x/test" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +func TestNewFilterFieldsIteratorError(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + s := segment.NewMockSegment(ctrl) + _, err := newFilterFieldsIterator(s, nil) + require.Error(t, err) +} + +func TestNewFilterFieldsIteratorNoMatchesInSegment(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + filters := AggregateFieldFilter{[]byte("a"), []byte("b")} + s := segment.NewMockSegment(ctrl) + iter, err := newFilterFieldsIterator(s, filters) + require.NoError(t, err) + + s.EXPECT().ContainsField(gomock.Any()).Return(false, nil).AnyTimes() + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} + +func TestNewFilterFieldsIteratorFirstMatch(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} + s := segment.NewMockSegment(ctrl) + iter, err := newFilterFieldsIterator(s, filters) + require.NoError(t, err) + + gomock.InOrder( + s.EXPECT().ContainsField([]byte("a")).Return(true, nil), + s.EXPECT().ContainsField([]byte("b")).Return(false, nil), + s.EXPECT().ContainsField([]byte("c")).Return(false, nil), + ) + require.True(t, iter.Next()) + require.Equal(t, "a", string(iter.Current())) + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} + +func TestNewFilterFieldsIteratorMiddleMatch(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} + s := segment.NewMockSegment(ctrl) + iter, err := newFilterFieldsIterator(s, filters) + require.NoError(t, err) + + gomock.InOrder( + s.EXPECT().ContainsField([]byte("a")).Return(false, nil), + s.EXPECT().ContainsField([]byte("b")).Return(true, nil), + s.EXPECT().ContainsField([]byte("c")).Return(false, nil), + ) + require.True(t, iter.Next()) + require.Equal(t, "b", string(iter.Current())) + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} + +func TestNewFilterFieldsIteratorEndMatch(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} + s := segment.NewMockSegment(ctrl) + iter, err := newFilterFieldsIterator(s, filters) + require.NoError(t, err) + + gomock.InOrder( + s.EXPECT().ContainsField([]byte("a")).Return(false, nil), + s.EXPECT().ContainsField([]byte("b")).Return(false, nil), + s.EXPECT().ContainsField([]byte("c")).Return(true, nil), + ) + require.True(t, iter.Next()) + require.Equal(t, "c", string(iter.Current())) + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} + +func TestNewFilterFieldsIteratorAllMatch(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} + s := segment.NewMockSegment(ctrl) + iter, err := newFilterFieldsIterator(s, filters) + require.NoError(t, err) + + gomock.InOrder( + s.EXPECT().ContainsField([]byte("a")).Return(true, nil), + s.EXPECT().ContainsField([]byte("b")).Return(true, nil), + s.EXPECT().ContainsField([]byte("c")).Return(true, nil), + ) + require.True(t, iter.Next()) + require.Equal(t, "a", string(iter.Current())) + require.True(t, iter.Next()) + require.Equal(t, "b", string(iter.Current())) + require.True(t, iter.Next()) + require.Equal(t, "c", string(iter.Current())) + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} + +func TestNewFilterFieldsIteratorRandomMatch(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} + s := segment.NewMockSegment(ctrl) + iter, err := newFilterFieldsIterator(s, filters) + require.NoError(t, err) + + gomock.InOrder( + s.EXPECT().ContainsField([]byte("a")).Return(true, nil), + s.EXPECT().ContainsField([]byte("b")).Return(false, nil), + s.EXPECT().ContainsField([]byte("c")).Return(true, nil), + ) + require.True(t, iter.Next()) + require.Equal(t, "a", string(iter.Current())) + require.True(t, iter.Next()) + require.Equal(t, "c", string(iter.Current())) + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} diff --git a/src/dbnode/storage/index/read_through_segment.go b/src/dbnode/storage/index/read_through_segment.go index 4d65ef9498..45bdc06d03 100644 --- a/src/dbnode/storage/index/read_through_segment.go +++ b/src/dbnode/storage/index/read_through_segment.go @@ -134,6 +134,12 @@ func (r *ReadThroughSegment) ContainsID(id []byte) (bool, error) { return r.segment.ContainsID(id) } +// ContainsField is a pass through call to the segment, since there's no +// postings lists to cache for queries. +func (r *ReadThroughSegment) ContainsField(field []byte) (bool, error) { + return r.segment.ContainsField(field) +} + // Size is a pass through call to the segment, since there's no // postings lists to cache for queries. func (r *ReadThroughSegment) Size() int64 { diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 687a10071f..c6a3a1be68 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -88,8 +88,8 @@ func (o QueryOptions) LimitExceeded(size int) bool { // AggregationOptions enables users to specify constraints on aggregations. type AggregationOptions struct { QueryOptions - TermFilter AggregateTermFilter - Type AggregationType + FieldFilter AggregateFieldFilter + Type AggregationType } // QueryResult is the collection of results for a query. @@ -201,10 +201,10 @@ type AggregateResults interface { Map() *AggregateResultsMap } -// AggregateTermFilter dictates which fields will appear in the aggregated -// result; if filter values exist, only those whose term matches a value in the +// AggregateFieldFilter dictates which fields will appear in the aggregated +// result; if filter values exist, only those whose fields matches a value in the // filter are returned. -type AggregateTermFilter [][]byte +type AggregateFieldFilter [][]byte // AggregateResultsOptions is a set of options to use for results. type AggregateResultsOptions struct { @@ -212,11 +212,11 @@ type AggregateResultsOptions struct { // overflown will return early successfully. SizeLimit int - // Optional param to filter aggregate values. - TermFilter AggregateTermFilter - // Type determines what result is required. Type AggregationType + + // FieldFilter is an optional param to filter aggregate values. + FieldFilter AggregateFieldFilter } // AggregateResultsAllocator allocates AggregateResults types. diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index a32dd6a44e..971b13aca5 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -686,7 +686,7 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() - query := idx.NewFieldQuery([]byte("a")) + query := idx.NewTermQuery([]byte("a"), []byte("b")) retention := 2 * time.Hour blockSize := time.Hour now := time.Now().Truncate(blockSize).Add(10 * time.Minute) @@ -805,7 +805,7 @@ func TestNamespaceIndexBlockAggregateQueryReleasingContext(t *testing.T) { opts := testDatabaseOptions() opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(nowFn)) - query := idx.NewFieldQuery([]byte("a")) + query := idx.NewTermQuery([]byte("a"), []byte("b")) b0 := index.NewMockBlock(ctrl) b0.EXPECT().Stats(gomock.Any()).Return(nil).AnyTimes() b0.EXPECT().Close().Return(nil) @@ -879,11 +879,11 @@ func TestNamespaceIndexBlockAggregateQueryReleasingContext(t *testing.T) { ctx.BlockingClose() } -func TestNamespaceIndexBlockAggregateQueryWithAllQuery(t *testing.T) { +func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() - query := idx.NewAllQuery() + queries := []idx.Query{idx.NewAllQuery(), idx.NewFieldQuery([]byte("field"))} retention := 2 * time.Hour blockSize := time.Hour now := time.Now().Truncate(blockSize).Add(10 * time.Minute) @@ -948,35 +948,37 @@ func TestNamespaceIndexBlockAggregateQueryWithAllQuery(t *testing.T) { // only queries as much as is needed (wrt to time) ctx := context.NewContext() - q := index.Query{query} qOpts := index.QueryOptions{ StartInclusive: t0, EndExclusive: now.Add(time.Minute), } aggOpts := index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) - _, err = idx.AggregateQuery(ctx, q, aggOpts) - require.NoError(t, err) + for _, query := range queries { + q := index.Query{query} + b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) + _, err = idx.AggregateQuery(ctx, q, aggOpts) + require.NoError(t, err) - // queries multiple blocks if needed - qOpts = index.QueryOptions{ - StartInclusive: t0, - EndExclusive: t2.Add(time.Minute), - } - aggOpts = index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) - b1.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) - _, err = idx.AggregateQuery(ctx, q, aggOpts) - require.NoError(t, err) - - // stops querying once a block returns non-exhaustive - qOpts = index.QueryOptions{ - StartInclusive: t0, - EndExclusive: t0.Add(time.Minute), + // queries multiple blocks if needed + qOpts = index.QueryOptions{ + StartInclusive: t0, + EndExclusive: t2.Add(time.Minute), + } + aggOpts = index.AggregationOptions{QueryOptions: qOpts} + b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) + b1.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) + _, err = idx.AggregateQuery(ctx, q, aggOpts) + require.NoError(t, err) + + // stops querying once a block returns non-exhaustive + qOpts = index.QueryOptions{ + StartInclusive: t0, + EndExclusive: t0.Add(time.Minute), + } + b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(false, nil) + aggOpts = index.AggregationOptions{QueryOptions: qOpts} + _, err = idx.AggregateQuery(ctx, q, aggOpts) + require.NoError(t, err) } - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(false, nil) - aggOpts = index.AggregationOptions{QueryOptions: qOpts} - _, err = idx.AggregateQuery(ctx, q, aggOpts) - require.NoError(t, err) } diff --git a/src/m3ninx/idx/query.go b/src/m3ninx/idx/query.go index e114673658..50d10588b6 100644 --- a/src/m3ninx/idx/query.go +++ b/src/m3ninx/idx/query.go @@ -55,6 +55,16 @@ func NewFieldQuery(field []byte) Query { } } +// FieldQuery returns a bool indicating whether the Query is a FieldQuery, +// and the backing bytes of the Field. +func FieldQuery(q Query) (field []byte, ok bool) { + fieldQuery, ok := q.query.(*query.FieldQuery) + if !ok { + return nil, false + } + return fieldQuery.Field(), true +} + // NewTermQuery returns a new query for finding documents which match a term exactly. func NewTermQuery(field, term []byte) Query { return Query{ diff --git a/src/m3ninx/idx/query_test.go b/src/m3ninx/idx/query_test.go new file mode 100644 index 0000000000..39395856fe --- /dev/null +++ b/src/m3ninx/idx/query_test.go @@ -0,0 +1,64 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package idx + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFieldQuery(t *testing.T) { + tests := []struct { + name string + query Query + expectedField []byte + expectedOK bool + }{ + { + name: "field query should be convertible", + query: NewFieldQuery([]byte("fruit")), + expectedField: []byte("fruit"), + expectedOK: true, + }, + { + name: "all query should not be convertible", + query: NewAllQuery(), + expectedOK: false, + }, + { + name: "term query should not be convertible", + query: NewTermQuery([]byte("a"), []byte("b")), + expectedOK: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + field, ok := FieldQuery(test.query) + require.Equal(t, test.expectedOK, ok) + if !ok { + return + } + require.Equal(t, test.expectedField, field) + }) + } +} diff --git a/src/m3ninx/index/segment/fst/fst_mock.go b/src/m3ninx/index/segment/fst/fst_mock.go index c201842533..c2df5c03c1 100644 --- a/src/m3ninx/index/segment/fst/fst_mock.go +++ b/src/m3ninx/index/segment/fst/fst_mock.go @@ -237,6 +237,21 @@ func (mr *MockSegmentMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSegment)(nil).Close)) } +// ContainsField mocks base method +func (m *MockSegment) ContainsField(arg0 []byte) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ContainsField", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ContainsField indicates an expected call of ContainsField +func (mr *MockSegmentMockRecorder) ContainsField(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContainsField", reflect.TypeOf((*MockSegment)(nil).ContainsField), arg0) +} + // ContainsID mocks base method func (m *MockSegment) ContainsID(arg0 []byte) (bool, error) { m.ctrl.T.Helper() diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index 2ab492604d..7cfdd47cd0 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -210,6 +210,15 @@ func (r *fsSegment) ContainsID(docID []byte) (bool, error) { return exists, closeErr } +func (r *fsSegment) ContainsField(field []byte) (bool, error) { + r.RLock() + defer r.RUnlock() + if r.closed { + return false, errReaderClosed + } + return r.fieldsFST.Contains(field) +} + func (r *fsSegment) Reader() (index.Reader, error) { r.RLock() defer r.RUnlock() diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index 3bcc6f927a..52bcb605bc 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -229,6 +229,27 @@ func TestFieldsEquals(t *testing.T) { } } +func TestContainsField(t *testing.T) { + for _, test := range testDocuments { + t.Run(test.name, func(t *testing.T) { + for _, tc := range newTestCases(t, test.docs) { + t.Run(tc.name, func(t *testing.T) { + expSeg, obsSeg := tc.expected, tc.observed + expFieldsIter, err := expSeg.FieldsIterable().Fields() + require.NoError(t, err) + expFields := toSlice(t, expFieldsIter) + + for _, f := range expFields { + ok, err := obsSeg.ContainsField(f) + require.NoError(t, err) + require.True(t, ok) + } + }) + } + }) + } +} + func TestTermEquals(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { diff --git a/src/m3ninx/index/segment/mem/segment.go b/src/m3ninx/index/segment/mem/segment.go index d7ed0374cb..17454bc86c 100644 --- a/src/m3ninx/index/segment/mem/segment.go +++ b/src/m3ninx/index/segment/mem/segment.go @@ -148,6 +148,18 @@ func (s *segment) containsIDWithStateLock(id []byte) bool { return s.termsDict.ContainsTerm(doc.IDReservedFieldName, id) } +func (s *segment) ContainsField(f []byte) (bool, error) { + s.state.RLock() + if s.state.closed { + s.state.RUnlock() + return false, sgmt.ErrClosed + } + + contains := s.termsDict.ContainsField(f) + s.state.RUnlock() + return contains, nil +} + func (s *segment) Insert(d doc.Document) ([]byte, error) { s.state.RLock() defer s.state.RUnlock() diff --git a/src/m3ninx/index/segment/mem/segment_test.go b/src/m3ninx/index/segment/mem/segment_test.go index 920343e512..5612d55446 100644 --- a/src/m3ninx/index/segment/mem/segment_test.go +++ b/src/m3ninx/index/segment/mem/segment_test.go @@ -543,6 +543,50 @@ func TestSegmentContainsID(t *testing.T) { require.NoError(t, segment.Close()) } +func TestSegmentContainsField(t *testing.T) { + docs := []doc.Document{ + doc.Document{ + ID: []byte("abc"), + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("apple"), + }, + doc.Field{ + Name: []byte("colour"), + Value: []byte("red"), + }, + }, + }, + doc.Document{ + ID: []byte("cde"), + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("banana"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("yellow"), + }, + }, + }, + } + b1 := index.NewBatch(docs, index.AllowPartialUpdates()) + segment, err := NewSegment(0, testOptions) + require.NoError(t, err) + + err = segment.InsertBatch(b1) + require.NoError(t, err) + for _, d := range docs { + for _, f := range d.Fields { + ok, err := segment.ContainsField(f.Name) + require.NoError(t, err) + require.True(t, ok) + } + } +} + func TestSegmentInsertBatchPartialErrorAlreadyIndexing(t *testing.T) { b1 := index.NewBatch( []doc.Document{ diff --git a/src/m3ninx/index/segment/mem/terms_dict.go b/src/m3ninx/index/segment/mem/terms_dict.go index 301cbe6a4e..1743a8a0a7 100644 --- a/src/m3ninx/index/segment/mem/terms_dict.go +++ b/src/m3ninx/index/segment/mem/terms_dict.go @@ -54,6 +54,13 @@ func (d *termsDict) Insert(field doc.Field, id postings.ID) error { return postingsMap.Add(field.Value, id) } +func (d *termsDict) ContainsField(field []byte) bool { + d.fields.RLock() + defer d.fields.RUnlock() + _, ok := d.fields.Get(field) + return ok +} + func (d *termsDict) ContainsTerm(field, term []byte) bool { _, found := d.matchTerm(field, term) return found diff --git a/src/m3ninx/index/segment/mem/terms_dict_test.go b/src/m3ninx/index/segment/mem/terms_dict_test.go index f9a188058f..c1ec4e2f40 100644 --- a/src/m3ninx/index/segment/mem/terms_dict_test.go +++ b/src/m3ninx/index/segment/mem/terms_dict_test.go @@ -176,6 +176,27 @@ func (t *termsDictionaryTestSuite) TestContainsTerm() { props.TestingRun(t.T()) } +func (t *termsDictionaryTestSuite) TestContainsField() { + props := getProperties() + props.Property( + "The dictionary should support field lookups", + prop.ForAll( + func(f doc.Field, id postings.ID) (bool, error) { + t.termsDict.Insert(f, id) + + if ok := t.termsDict.ContainsField(f.Name); !ok { + return false, fmt.Errorf("id of new document '%v' is not in postings list of matching documents", id) + } + + return true, nil + }, + genField(), + genDocID(), + )) + + props.TestingRun(t.T()) +} + func (t *termsDictionaryTestSuite) TestMatchTerm() { props := getProperties() props.Property( diff --git a/src/m3ninx/index/segment/mem/types.go b/src/m3ninx/index/segment/mem/types.go index 304e624129..663175f35b 100644 --- a/src/m3ninx/index/segment/mem/types.go +++ b/src/m3ninx/index/segment/mem/types.go @@ -33,6 +33,10 @@ type termsDictionary interface { // Insert inserts the field with the given ID into the terms dictionary. Insert(field doc.Field, id postings.ID) error + // ContainsField returns a bool indicating whether the terms dictionary contains + // the given field. + ContainsField(field []byte) bool + // ContainsTerm returns a bool indicating whether the terms dictionary contains // the given term. ContainsTerm(field, term []byte) bool diff --git a/src/m3ninx/index/segment/segment_mock.go b/src/m3ninx/index/segment/segment_mock.go index 0609417bcd..e7295eb5b1 100644 --- a/src/m3ninx/index/segment/segment_mock.go +++ b/src/m3ninx/index/segment/segment_mock.go @@ -114,6 +114,21 @@ func (mr *MockSegmentMockRecorder) ContainsID(docID interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContainsID", reflect.TypeOf((*MockSegment)(nil).ContainsID), docID) } +// ContainsField mocks base method +func (m *MockSegment) ContainsField(field []byte) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ContainsField", field) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ContainsField indicates an expected call of ContainsField +func (mr *MockSegmentMockRecorder) ContainsField(field interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContainsField", reflect.TypeOf((*MockSegment)(nil).ContainsField), field) +} + // Reader mocks base method func (m *MockSegment) Reader() (index.Reader, error) { m.ctrl.T.Helper() @@ -537,6 +552,21 @@ func (mr *MockMutableSegmentMockRecorder) ContainsID(docID interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContainsID", reflect.TypeOf((*MockMutableSegment)(nil).ContainsID), docID) } +// ContainsField mocks base method +func (m *MockMutableSegment) ContainsField(field []byte) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ContainsField", field) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ContainsField indicates an expected call of ContainsField +func (mr *MockMutableSegmentMockRecorder) ContainsField(field interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContainsField", reflect.TypeOf((*MockMutableSegment)(nil).ContainsField), field) +} + // Reader mocks base method func (m *MockMutableSegment) Reader() (index.Reader, error) { m.ctrl.T.Helper() diff --git a/src/m3ninx/index/segment/types.go b/src/m3ninx/index/segment/types.go index babfade664..253235527d 100644 --- a/src/m3ninx/index/segment/types.go +++ b/src/m3ninx/index/segment/types.go @@ -53,6 +53,9 @@ type Segment interface { // ContainsID returns a bool indicating if the Segment contains the provided ID. ContainsID(docID []byte) (bool, error) + // ContainsField returns a bool indicating if the Segment contains the provided field. + ContainsField(field []byte) (bool, error) + // Reader returns a point-in-time accessor to search the segment. Reader() (index.Reader, error) diff --git a/src/m3ninx/search/query/field.go b/src/m3ninx/search/query/field.go index fba3b5ca02..882f6bae91 100644 --- a/src/m3ninx/search/query/field.go +++ b/src/m3ninx/search/query/field.go @@ -41,6 +41,11 @@ func NewFieldQuery(field []byte) search.Query { } } +// Field returns the field []byte. +func (q *FieldQuery) Field() []byte { + return q.field +} + // Searcher returns a searcher over the provided readers. func (q *FieldQuery) Searcher() (search.Searcher, error) { return searcher.NewFieldSearcher(q.field) diff --git a/src/query/storage/index.go b/src/query/storage/index.go index 769fc77e4f..07d96e2c5b 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -115,7 +115,7 @@ func FetchOptionsToAggregateOptions( StartInclusive: tagQuery.Start, EndExclusive: tagQuery.End, }, - TermFilter: tagQuery.FilterNameTags, + FieldFilter: tagQuery.FilterNameTags, Type: convertAggregateQueryType(tagQuery.CompleteNameOnly), } } diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index 4b40a7a054..159510e51f 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -211,6 +211,6 @@ func TestFetchOptionsToAggregateOptions(t *testing.T) { assert.Equal(t, end, aggOpts.EndExclusive) assert.Equal(t, start, aggOpts.StartInclusive) assert.Equal(t, index.AggregateTagNames, aggOpts.Type) - require.Equal(t, 1, len(aggOpts.TermFilter)) - require.Equal(t, "filter", string(aggOpts.TermFilter[0])) + require.Equal(t, 1, len(aggOpts.FieldFilter)) + require.Equal(t, "filter", string(aggOpts.FieldFilter[0])) } From 8927592f744e7a9f8e9d0063f0858b939c44adef Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Sun, 12 May 2019 16:43:22 -0400 Subject: [PATCH 2/2] feedback --- src/dbnode/storage/index.go | 3 +- .../storage/index/aggregated_term_filter.go | 40 ++++++++++++++- .../index/aggregated_term_filter_test.go | 50 +++++++++++++++++++ src/dbnode/storage/index/block.go | 5 -- 4 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 src/dbnode/storage/index/aggregated_term_filter_test.go diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index ee60fc0ef8..a4efe0987c 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -925,8 +925,9 @@ func (i *nsIndex) AggregateQuery( field, isField := idx.FieldQuery(query.Query) if isField { fn = i.execBlockAggregateQueryFn - aopts.FieldFilter = append(index.AggregateFieldFilter{field}, aopts.FieldFilter...) + aopts.FieldFilter = aopts.FieldFilter.AddIfMissing(field) } + aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe() results.Reset(i.nsMetadata.ID(), aopts) exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn) if err != nil { diff --git a/src/dbnode/storage/index/aggregated_term_filter.go b/src/dbnode/storage/index/aggregated_term_filter.go index 5a661c305f..e075d80ec4 100644 --- a/src/dbnode/storage/index/aggregated_term_filter.go +++ b/src/dbnode/storage/index/aggregated_term_filter.go @@ -20,7 +20,10 @@ package index -import "bytes" +import ( + "bytes" + "sort" +) // Allow returns true if the given term satisfies the filter. func (f AggregateFieldFilter) Allow(term []byte) bool { @@ -37,3 +40,38 @@ func (f AggregateFieldFilter) Allow(term []byte) bool { return false } + +// AddIfMissing adds the provided field if it's missing from the filter. +func (f AggregateFieldFilter) AddIfMissing(field []byte) AggregateFieldFilter { + for _, fi := range f { + if bytes.Equal(fi, field) { + return f + } + } + f = append(f, field) + return f +} + +// SortAndDedupe sorts and de-dupes the fields in the filter. +func (f AggregateFieldFilter) SortAndDedupe() AggregateFieldFilter { + // short-circuit if possible. + if len(f) <= 1 { + return f + } + // sort the provided filter fields in order. + sort.Slice(f, func(i, j int) bool { + return bytes.Compare(f[i], f[j]) < 0 + }) + // ensure successive elements are not the same. + deduped := make(AggregateFieldFilter, 0, len(f)) + if len(f) > 0 { + deduped = append(deduped, f[0]) + } + for i := 1; i < len(f); i++ { + if bytes.Equal(f[i-1], f[i]) { + continue + } + deduped = append(deduped, f[i]) + } + return deduped +} diff --git a/src/dbnode/storage/index/aggregated_term_filter_test.go b/src/dbnode/storage/index/aggregated_term_filter_test.go new file mode 100644 index 0000000000..1e5b206abf --- /dev/null +++ b/src/dbnode/storage/index/aggregated_term_filter_test.go @@ -0,0 +1,50 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSortAndDedupe(t *testing.T) { + filter := AggregateFieldFilter{ + []byte("c"), + []byte("a"), + []byte("b"), + []byte("a"), + []byte("c"), + []byte("a"), + } + observed := filter.SortAndDedupe() + require.Equal(t, AggregateFieldFilter{ + []byte("a"), + []byte("b"), + []byte("c"), + }, observed) +} + +func TestSortAndDedupeEmpty(t *testing.T) { + filter := AggregateFieldFilter{} + observed := filter.SortAndDedupe() + require.Equal(t, AggregateFieldFilter{}, observed) +} diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 591cae807a..5a08518cb7 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -24,7 +24,6 @@ import ( "bytes" "errors" "fmt" - "sort" "sync" "time" @@ -895,10 +894,6 @@ func (b *block) Aggregate( } aggOpts := results.AggregateResultsOptions() - // ensure we iterate the provided filter fields in order. - sort.Slice(aggOpts.FieldFilter, func(i, j int) bool { - return bytes.Compare(aggOpts.FieldFilter[i], aggOpts.FieldFilter[j]) < 0 - }) iterateTerms := aggOpts.Type == AggregateTagNamesAndValues iterateOpts := fieldsAndTermsIteratorOpts{ iterateTerms: iterateTerms,