Skip to content

Commit

Permalink
[dbnode][m3ninx] Use new doc.Document in query results to reduce slic…
Browse files Browse the repository at this point in the history
…e allocations (#3057)
  • Loading branch information
nbroyles authored Jan 13, 2021
1 parent 60e8e70 commit c46f488
Show file tree
Hide file tree
Showing 49 changed files with 860 additions and 371 deletions.
4 changes: 2 additions & 2 deletions src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ func (bsc BootstrapConfiguration) New(
adminClient client.AdminClient,
) (bootstrap.ProcessProvider, error) {
idxOpts := opts.IndexOptions()
compactor, err := compaction.NewCompactor(idxOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(idxOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
idxOpts.SegmentBuilderOptions(),
idxOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
10 changes: 8 additions & 2 deletions src/cmd/tools/query_index_segments/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/m3ninx/search/executor"
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/parser/promql"
Expand Down Expand Up @@ -189,11 +190,16 @@ func run(opts runOptions) {
log.Fatal("search execute error", zap.Error(err))
}

reader := docs.NewEncodedDocumentReader()
fields := make(map[string]string)
for iter.Next() {
d := iter.Current()
m, err := docs.MetadataFromDocument(d, reader)
if err != nil {
log.Fatal("error retrieve document metadata", zap.Error(err))
}

key := string(d.ID)
key := string(m.ID)

resultsLock.Lock()
_, ok := results[key]
Expand All @@ -209,7 +215,7 @@ func run(opts runOptions) {
for k := range fields {
delete(fields, k)
}
for _, field := range d.Fields {
for _, field := range m.Fields { // nolint:gocritic
fields[string(field.Name)] = string(field.Value)
}

Expand Down
5 changes: 2 additions & 3 deletions src/dbnode/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,9 @@ genny-map-persist-fs:
# Map generation rule for storage/index/ResultsMap
.PHONY: genny-map-storage-index-results
genny-map-storage-index-results:
cd $(m3x_package_path) && make hashmap-gen \
cd $(m3x_package_path) && make byteshashmap-gen \
pkg=index \
key_type=ident.ID \
value_type=ident.TagIterator \
value_type=doc.Document \
target_package=$(m3db_package)/src/dbnode/storage/index \
rename_nogen_key=true \
rename_nogen_value=true \
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ func newCompactor(
}

func newCompactorWithErr(opts index.Options) (*compaction.Compactor, error) {
return compaction.NewCompactor(opts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
return compaction.NewCompactor(opts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
opts.SegmentBuilderOptions(),
opts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
11 changes: 8 additions & 3 deletions src/dbnode/integration/write_tagged_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/test"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/x/xio"
m3ninxidx "github.com/m3db/m3/src/m3ninx/idx"
Expand Down Expand Up @@ -303,9 +304,13 @@ func nodeHasTaggedWrite(t *testing.T, s TestSetup) bool {
require.NoError(t, err)
results := res.Results
require.Equal(t, nsCtx.ID.String(), results.Namespace().String())
tags, ok := results.Map().Get(ident.StringID("quorumTest"))
idxFound := ok && ident.NewTagIterMatcher(ident.MustNewTagStringsIterator(
"foo", "bar", "boo", "baz")).Matches(tags)
doc, ok := results.Map().Get(ident.BytesID("quorumTest"))
idxFound := false
if ok {
tags := test.DocumentToTagIter(t, doc)
idxFound = ident.NewTagIterMatcher(ident.MustNewTagStringsIterator(
"foo", "bar", "boo", "baz")).Matches(tags)
}

if !idxFound {
return false
Expand Down
38 changes: 30 additions & 8 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/index"
idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
Expand Down Expand Up @@ -514,10 +516,19 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query
if req.NoData != nil && *req.NoData {
fetchData = false
}
// Re-use reader and id for more memory-efficient processing of
// tags from doc.Metadata
reader := docs.NewEncodedDocumentReader()
id := ident.NewReusableBytesID()
for _, entry := range queryResult.Results.Map().Iter() {
tags := entry.Value()
d := entry.Value()
metadata, err := docs.MetadataFromDocument(d, reader)
if err != nil {
return nil, err
}
tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true})
elem := &rpc.QueryResultElement{
ID: entry.Key().String(),
ID: string(entry.Key()),
Tags: make([]*rpc.Tag, 0, tags.Remaining()),
}
result.Results = append(result.Results, elem)
Expand All @@ -535,8 +546,8 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query
if !fetchData {
continue
}
tsID := entry.Key()
datapoints, err := s.readDatapoints(ctx, db, nsID, tsID, start, end,
id.Reset(entry.Key())
datapoints, err := s.readDatapoints(ctx, db, nsID, id, start, end,
req.ResultTimeType)
if err != nil {
return nil, convert.ToRPCError(err)
Expand Down Expand Up @@ -794,12 +805,23 @@ func (s *service) fetchReadEncoded(ctx context.Context,
defer sp.Finish()

i := 0
// Re-use reader and id for more memory-efficient processing of
// tags from doc.Metadata
reader := docs.NewEncodedDocumentReader()
id := ident.NewReusableBytesID()
for _, entry := range results.Map().Iter() {
idx := i
i++

tsID := entry.Key()
tags := entry.Value()
id.Reset(entry.Key())

d := entry.Value()
metadata, err := docs.MetadataFromDocument(d, reader)
if err != nil {
return err
}
tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true})

enc := s.pools.tagEncoder.Get()
ctx.RegisterFinalizer(enc)
encodedTags, err := s.encodeTags(enc, tags)
Expand All @@ -809,15 +831,15 @@ func (s *service) fetchReadEncoded(ctx context.Context,

elem := &rpc.FetchTaggedIDResult_{
NameSpace: nsIDBytes,
ID: tsID.Bytes(),
ID: id.Bytes(),
EncodedTags: encodedTags.Bytes(),
}
response.Elements = append(response.Elements, elem)
if !fetchData {
continue
}

encoded, err := db.ReadEncoded(ctx, nsID, tsID,
encoded, err := db.ReadEncoded(ctx, nsID, id,
opts.StartInclusive, opts.EndExclusive)
if err != nil {
return convert.ToRPCError(err)
Expand Down
145 changes: 115 additions & 30 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -286,16 +287,37 @@ func TestServiceQuery(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("dzk"),
Value: []byte("baz"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag(tags["foo"][0].name, tags["foo"][0].value),
ident.StringTag(tags["foo"][1].name, tags["foo"][1].value),
)))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag(tags["bar"][0].name, tags["bar"][0].value),
ident.StringTag(tags["bar"][1].name, tags["bar"][1].value),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

mockDB.EXPECT().QueryIDs(
ctx,
Expand Down Expand Up @@ -1579,16 +1601,37 @@ func TestServiceFetchTagged(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("dzk"),
Value: []byte("baz"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
)))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("dzk", "baz"),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

mockDB.EXPECT().QueryIDs(
gomock.Any(),
Expand Down Expand Up @@ -1685,16 +1728,37 @@ func TestServiceFetchTaggedIsOverloaded(t *testing.T) {
req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*"))
require.NoError(t, err)

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("dzk"),
Value: []byte("baz"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
)))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("dzk", "baz"),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
require.NoError(t, err)
Expand Down Expand Up @@ -1779,10 +1843,20 @@ func TestServiceFetchTaggedNoData(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{},
}
md2 := doc.Metadata{
ID: ident.BytesID("bar"),
Fields: []doc.Field{},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.Tags{}))
resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.Tags{}))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))
resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2))

mockDB.EXPECT().QueryIDs(
ctx,
ident.NewIDMatcher(nsID),
Expand Down Expand Up @@ -1931,12 +2005,23 @@ func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

md1 := doc.Metadata{
ID: ident.BytesID("foo"),
Fields: []doc.Field{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
{
Name: []byte("baz"),
Value: []byte("dxk"),
},
},
}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
)))
resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1))

mockDB.EXPECT().QueryIDs(
gomock.Any(),
Expand Down
Loading

0 comments on commit c46f488

Please sign in to comment.