Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
prateek committed May 12, 2019
1 parent c15809b commit 8927592
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,9 @@ func (i *nsIndex) AggregateQuery(
field, isField := idx.FieldQuery(query.Query)
if isField {
fn = i.execBlockAggregateQueryFn
aopts.FieldFilter = append(index.AggregateFieldFilter{field}, aopts.FieldFilter...)
aopts.FieldFilter = aopts.FieldFilter.AddIfMissing(field)
}
aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe()
results.Reset(i.nsMetadata.ID(), aopts)
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn)
if err != nil {
Expand Down
40 changes: 39 additions & 1 deletion src/dbnode/storage/index/aggregated_term_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

package index

import "bytes"
import (
"bytes"
"sort"
)

// Allow returns true if the given term satisfies the filter.
func (f AggregateFieldFilter) Allow(term []byte) bool {
Expand All @@ -37,3 +40,38 @@ func (f AggregateFieldFilter) Allow(term []byte) bool {

return false
}

// AddIfMissing adds the provided field if it's missing from the filter.
func (f AggregateFieldFilter) AddIfMissing(field []byte) AggregateFieldFilter {
for _, fi := range f {
if bytes.Equal(fi, field) {
return f
}
}
f = append(f, field)
return f
}

// SortAndDedupe sorts and de-dupes the fields in the filter.
func (f AggregateFieldFilter) SortAndDedupe() AggregateFieldFilter {
// short-circuit if possible.
if len(f) <= 1 {
return f
}
// sort the provided filter fields in order.
sort.Slice(f, func(i, j int) bool {
return bytes.Compare(f[i], f[j]) < 0
})
// ensure successive elements are not the same.
deduped := make(AggregateFieldFilter, 0, len(f))
if len(f) > 0 {
deduped = append(deduped, f[0])
}
for i := 1; i < len(f); i++ {
if bytes.Equal(f[i-1], f[i]) {
continue
}
deduped = append(deduped, f[i])
}
return deduped
}
50 changes: 50 additions & 0 deletions src/dbnode/storage/index/aggregated_term_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package index

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestSortAndDedupe(t *testing.T) {
filter := AggregateFieldFilter{
[]byte("c"),
[]byte("a"),
[]byte("b"),
[]byte("a"),
[]byte("c"),
[]byte("a"),
}
observed := filter.SortAndDedupe()
require.Equal(t, AggregateFieldFilter{
[]byte("a"),
[]byte("b"),
[]byte("c"),
}, observed)
}

func TestSortAndDedupeEmpty(t *testing.T) {
filter := AggregateFieldFilter{}
observed := filter.SortAndDedupe()
require.Equal(t, AggregateFieldFilter{}, observed)
}
5 changes: 0 additions & 5 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"bytes"
"errors"
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -895,10 +894,6 @@ func (b *block) Aggregate(
}

aggOpts := results.AggregateResultsOptions()
// ensure we iterate the provided filter fields in order.
sort.Slice(aggOpts.FieldFilter, func(i, j int) bool {
return bytes.Compare(aggOpts.FieldFilter[i], aggOpts.FieldFilter[j]) < 0
})
iterateTerms := aggOpts.Type == AggregateTagNamesAndValues
iterateOpts := fieldsAndTermsIteratorOpts{
iterateTerms: iterateTerms,
Expand Down

0 comments on commit 8927592

Please sign in to comment.