Skip to content

Commit

Permalink
Update BaseResults to use documents for query results
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Jan 5, 2021
1 parent e88ee26 commit 9dbd2ce
Show file tree
Hide file tree
Showing 25 changed files with 522 additions and 238 deletions.
10 changes: 8 additions & 2 deletions src/cmd/tools/query_index_segments/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/m3ninx/search/executor"
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/parser/promql"
Expand Down Expand Up @@ -189,11 +190,16 @@ func run(opts runOptions) {
log.Fatal("search execute error", zap.Error(err))
}

reader := docs.NewEncodedDocumentReader()
fields := make(map[string]string)
for iter.Next() {
d := iter.Current()
m, err := docs.GetFromDocument(d, reader)
if err != nil {
log.Fatal("error retrieve document metadata", zap.Error(err))
}

key := string(d.ID)
key := string(m.ID)

resultsLock.Lock()
_, ok := results[key]
Expand All @@ -209,7 +215,7 @@ func run(opts runOptions) {
for k := range fields {
delete(fields, k)
}
for _, field := range d.Fields {
for _, field := range m.Fields { // nolint:gocritic
fields[string(field.Name)] = string(field.Value)
}

Expand Down
5 changes: 2 additions & 3 deletions src/dbnode/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,9 @@ genny-map-persist-fs:
# Map generation rule for storage/index/ResultsMap
.PHONY: genny-map-storage-index-results
genny-map-storage-index-results:
cd $(m3x_package_path) && make hashmap-gen \
cd $(m3x_package_path) && make byteshashmap-gen \
pkg=index \
key_type=ident.ID \
value_type=ident.TagIterator \
value_type=doc.Document \
target_package=$(m3db_package)/src/dbnode/storage/index \
rename_nogen_key=true \
rename_nogen_value=true \
Expand Down
30 changes: 25 additions & 5 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/index"
idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
Expand Down Expand Up @@ -514,10 +516,17 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query
if req.NoData != nil && *req.NoData {
fetchData = false
}
reader := docs.NewEncodedDocumentReader()
id := ident.NewReusableBytesID()
for _, entry := range queryResult.Results.Map().Iter() {
tags := entry.Value()
d := entry.Value()
metadata, err := docs.GetFromDocument(d, reader)
if err != nil {
return nil, err
}
tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true})
elem := &rpc.QueryResultElement{
ID: entry.Key().String(),
ID: string(entry.Key()),
Tags: make([]*rpc.Tag, 0, tags.Remaining()),
}
result.Results = append(result.Results, elem)
Expand All @@ -535,7 +544,8 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query
if !fetchData {
continue
}
tsID := entry.Key()
id.Reset(entry.Key())
tsID := id
datapoints, err := s.readDatapoints(ctx, db, nsID, tsID, start, end,
req.ResultTimeType)
if err != nil {
Expand Down Expand Up @@ -794,12 +804,22 @@ func (s *service) fetchReadEncoded(ctx context.Context,
defer sp.Finish()

i := 0
reader := docs.NewEncodedDocumentReader()
id := ident.NewReusableBytesID()
for _, entry := range results.Map().Iter() {
idx := i
i++

tsID := entry.Key()
tags := entry.Value()
id.Reset(entry.Key())
tsID := id

d := entry.Value()
metadata, err := docs.GetFromDocument(d, reader)
if err != nil {
return err
}
tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true})

enc := s.pools.tagEncoder.Get()
ctx.RegisterFinalizer(enc)
encodedTags, err := s.encodeTags(enc, tags)
Expand Down
145 changes: 115 additions & 30 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -286,16 +287,37 @@ func TestServiceQuery(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("dzk"),
Value: []byte("baz"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag(tags["foo"][0].name, tags["foo"][0].value),
ident.StringTag(tags["foo"][1].name, tags["foo"][1].value),
)))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag(tags["bar"][0].name, tags["bar"][0].value),
ident.StringTag(tags["bar"][1].name, tags["bar"][1].value),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

mockDB.EXPECT().QueryIDs(
ctx,
Expand Down Expand Up @@ -1579,16 +1601,37 @@ func TestServiceFetchTagged(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("dzk"),
Value: []byte("baz"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
)))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("dzk", "baz"),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

mockDB.EXPECT().QueryIDs(
gomock.Any(),
Expand Down Expand Up @@ -1688,16 +1731,37 @@ func TestServiceFetchTaggedIsOverloaded(t *testing.T) {
req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*"))
require.NoError(t, err)

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("dzk"),
Value: []byte("baz"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
)))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("dzk", "baz"),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
require.NoError(t, err)
Expand Down Expand Up @@ -1782,10 +1846,20 @@ func TestServiceFetchTaggedNoData(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.Tags{}))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.Tags{}))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

mockDB.EXPECT().QueryIDs(
ctx,
ident.NewIDMatcher(nsID),
Expand Down Expand Up @@ -1934,12 +2008,23 @@ func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))

mockDB.EXPECT().QueryIDs(
gomock.Any(),
Expand Down
19 changes: 15 additions & 4 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
)
Expand All @@ -49,6 +50,8 @@ type aggregatedResults struct {

pool AggregateResultsPool
valuesPool AggregateValuesPool

encodedDocReader docs.EncodedDocumentReader
}

// NewAggregateResults returns a new AggregateResults object.
Expand Down Expand Up @@ -104,7 +107,7 @@ func (r *aggregatedResults) Reset(
r.Unlock()
}

func (r *aggregatedResults) AddDocuments(batch []doc.Metadata) (int, int, error) {
func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) {
r.Lock()
err := r.addDocumentsBatchWithLock(batch)
size := r.resultsMap.Len()
Expand Down Expand Up @@ -162,7 +165,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
}

func (r *aggregatedResults) addDocumentsBatchWithLock(
batch []doc.Metadata,
batch []doc.Document,
) error {
for _, doc := range batch {
switch r.aggregateOpts.Type {
Expand Down Expand Up @@ -193,8 +196,12 @@ func (r *aggregatedResults) addDocumentsBatchWithLock(
}

func (r *aggregatedResults) addDocumentTermsWithLock(
document doc.Metadata,
container doc.Document,
) error {
document, err := docs.GetFromDocument(container, &r.encodedDocReader)
if err != nil {
return fmt.Errorf("unable to decode encoded document; %w", err)
}
for _, field := range document.Fields {
if err := r.addTermWithLock(field.Name); err != nil {
return fmt.Errorf("unable to add document terms [%+v]: %v", document, err)
Expand Down Expand Up @@ -233,8 +240,12 @@ func (r *aggregatedResults) addTermWithLock(
}

func (r *aggregatedResults) addDocumentWithLock(
document doc.Metadata,
container doc.Document,
) error {
document, err := docs.GetFromDocument(container, &r.encodedDocReader)
if err != nil {
return fmt.Errorf("unable to decode encoded document; %w", err)
}
for _, field := range document.Fields {
if err := r.addFieldWithLock(field.Name, field.Value); err != nil {
return fmt.Errorf("unable to add document [%+v]: %v", document, err)
Expand Down
Loading

0 comments on commit 9dbd2ce

Please sign in to comment.