diff --git a/src/cmd/tools/query_index_segments/main/main.go b/src/cmd/tools/query_index_segments/main/main.go index 1556a4add4..33aa738419 100644 --- a/src/cmd/tools/query_index_segments/main/main.go +++ b/src/cmd/tools/query_index_segments/main/main.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/m3ninx/index" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/m3ninx/search/executor" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/parser/promql" @@ -189,11 +190,16 @@ func run(opts runOptions) { log.Fatal("search execute error", zap.Error(err)) } + reader := docs.NewEncodedDocumentReader() fields := make(map[string]string) for iter.Next() { d := iter.Current() + m, err := docs.GetFromDocument(d, reader) + if err != nil { + log.Fatal("error retrieve document metadata", zap.Error(err)) + } - key := string(d.ID) + key := string(m.ID) resultsLock.Lock() _, ok := results[key] @@ -209,7 +215,7 @@ func run(opts runOptions) { for k := range fields { delete(fields, k) } - for _, field := range d.Fields { + for _, field := range m.Fields { // nolint:gocritic fields[string(field.Name)] = string(field.Value) } diff --git a/src/dbnode/generated-source-files.mk b/src/dbnode/generated-source-files.mk index 748dfeaea9..6ca17180f5 100644 --- a/src/dbnode/generated-source-files.mk +++ b/src/dbnode/generated-source-files.mk @@ -149,10 +149,9 @@ genny-map-persist-fs: # Map generation rule for storage/index/ResultsMap .PHONY: genny-map-storage-index-results genny-map-storage-index-results: - cd $(m3x_package_path) && make hashmap-gen \ + cd $(m3x_package_path) && make byteshashmap-gen \ pkg=index \ - key_type=ident.ID \ - value_type=ident.TagIterator \ + value_type=doc.Document \ target_package=$(m3db_package)/src/dbnode/storage/index \ rename_nogen_key=true \ rename_nogen_value=true \ diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 2250b1ba5a..1e6751be83 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -37,11 +37,13 @@ import ( "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index" + idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts/writes" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/dbnode/x/xpool" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" @@ -514,10 +516,17 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query if req.NoData != nil && *req.NoData { fetchData = false } + reader := docs.NewEncodedDocumentReader() + id := ident.NewReusableBytesID() for _, entry := range queryResult.Results.Map().Iter() { - tags := entry.Value() + d := entry.Value() + metadata, err := docs.GetFromDocument(d, reader) + if err != nil { + return nil, err + } + tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true}) elem := &rpc.QueryResultElement{ - ID: entry.Key().String(), + ID: string(entry.Key()), Tags: make([]*rpc.Tag, 0, tags.Remaining()), } result.Results = append(result.Results, elem) @@ -535,7 +544,8 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query if !fetchData { continue } - tsID := entry.Key() + id.Reset(entry.Key()) + tsID := id datapoints, err := s.readDatapoints(ctx, db, nsID, tsID, start, end, req.ResultTimeType) if err != nil { @@ -794,12 +804,22 @@ func (s *service) fetchReadEncoded(ctx context.Context, defer sp.Finish() i := 0 + reader := docs.NewEncodedDocumentReader() + id := ident.NewReusableBytesID() for _, entry := range results.Map().Iter() { idx := i i++ - tsID := entry.Key() - tags := entry.Value() + id.Reset(entry.Key()) + tsID := id + + d := entry.Value() + metadata, err := docs.GetFromDocument(d, reader) + if err != nil { + return err + } + tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true}) + enc := s.pools.tagEncoder.Get() ctx.RegisterFinalizer(enc) encodedTags, err := s.encodeTags(enc, tags) diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 2b8faf0219..ffa98e1af0 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -44,6 +44,7 @@ import ( "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/ts/writes" "github.com/m3db/m3/src/dbnode/x/xio" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" @@ -286,16 +287,37 @@ func TestServiceQuery(t *testing.T) { require.NoError(t, err) qry := index.Query{Query: req} + md1 := doc.Metadata{ + ID: ident.BytesID("foo"), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte("baz"), + Value: []byte("dxk"), + }, + }, + } + md2 := doc.Metadata{ + ID: ident.BytesID("bar"), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte("dzk"), + Value: []byte("baz"), + }, + }, + } + resMap := index.NewQueryResults(ident.StringID(nsID), index.QueryResultsOptions{}, testIndexOptions) - resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags( - ident.StringTag(tags["foo"][0].name, tags["foo"][0].value), - ident.StringTag(tags["foo"][1].name, tags["foo"][1].value), - ))) - resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags( - ident.StringTag(tags["bar"][0].name, tags["bar"][0].value), - ident.StringTag(tags["bar"][1].name, tags["bar"][1].value), - ))) + resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) + resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2)) mockDB.EXPECT().QueryIDs( ctx, @@ -1579,16 +1601,37 @@ func TestServiceFetchTagged(t *testing.T) { require.NoError(t, err) qry := index.Query{Query: req} + md1 := doc.Metadata{ + ID: ident.BytesID("foo"), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte("baz"), + Value: []byte("dxk"), + }, + }, + } + md2 := doc.Metadata{ + ID: ident.BytesID("bar"), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte("dzk"), + Value: []byte("baz"), + }, + }, + } + resMap := index.NewQueryResults(ident.StringID(nsID), index.QueryResultsOptions{}, testIndexOptions) - resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags( - ident.StringTag("foo", "bar"), - ident.StringTag("baz", "dxk"), - ))) - resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags( - ident.StringTag("foo", "bar"), - ident.StringTag("dzk", "baz"), - ))) + resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) + resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2)) mockDB.EXPECT().QueryIDs( gomock.Any(), @@ -1688,16 +1731,37 @@ func TestServiceFetchTaggedIsOverloaded(t *testing.T) { req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*")) require.NoError(t, err) + md1 := doc.Metadata{ + ID: ident.BytesID("foo"), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte("baz"), + Value: []byte("dxk"), + }, + }, + } + md2 := doc.Metadata{ + ID: ident.BytesID("bar"), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte("dzk"), + Value: []byte("baz"), + }, + }, + } + resMap := index.NewQueryResults(ident.StringID(nsID), index.QueryResultsOptions{}, testIndexOptions) - resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags( - ident.StringTag("foo", "bar"), - ident.StringTag("baz", "dxk"), - ))) - resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.NewTags( - ident.StringTag("foo", "bar"), - ident.StringTag("dzk", "baz"), - ))) + resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) + resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2)) startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) @@ -1782,10 +1846,20 @@ func TestServiceFetchTaggedNoData(t *testing.T) { require.NoError(t, err) qry := index.Query{Query: req} + md1 := doc.Metadata{ + ID: ident.BytesID("foo"), + Fields: []doc.Field{}, + } + md2 := doc.Metadata{ + ID: ident.BytesID("bar"), + Fields: []doc.Field{}, + } + resMap := index.NewQueryResults(ident.StringID(nsID), index.QueryResultsOptions{}, testIndexOptions) - resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.Tags{})) - resMap.Map().Set(ident.StringID("bar"), ident.NewTagsIterator(ident.Tags{})) + resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) + resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2)) + mockDB.EXPECT().QueryIDs( ctx, ident.NewIDMatcher(nsID), @@ -1934,12 +2008,23 @@ func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) { require.NoError(t, err) qry := index.Query{Query: req} + md1 := doc.Metadata{ + ID: ident.BytesID("foo"), + Fields: []doc.Field{ + { + Name: []byte("foo"), + Value: []byte("bar"), + }, + { + Name: []byte("baz"), + Value: []byte("dxk"), + }, + }, + } + resMap := index.NewQueryResults(ident.StringID(nsID), index.QueryResultsOptions{}, testIndexOptions) - resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags( - ident.StringTag("foo", "bar"), - ident.StringTag("baz", "dxk"), - ))) + resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) mockDB.EXPECT().QueryIDs( gomock.Any(), diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index adfe613600..de709de96e 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" ) @@ -49,6 +50,8 @@ type aggregatedResults struct { pool AggregateResultsPool valuesPool AggregateValuesPool + + encodedDocReader docs.EncodedDocumentReader } // NewAggregateResults returns a new AggregateResults object. @@ -104,7 +107,7 @@ func (r *aggregatedResults) Reset( r.Unlock() } -func (r *aggregatedResults) AddDocuments(batch []doc.Metadata) (int, int, error) { +func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) { r.Lock() err := r.addDocumentsBatchWithLock(batch) size := r.resultsMap.Len() @@ -162,7 +165,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) } func (r *aggregatedResults) addDocumentsBatchWithLock( - batch []doc.Metadata, + batch []doc.Document, ) error { for _, doc := range batch { switch r.aggregateOpts.Type { @@ -193,8 +196,12 @@ func (r *aggregatedResults) addDocumentsBatchWithLock( } func (r *aggregatedResults) addDocumentTermsWithLock( - document doc.Metadata, + container doc.Document, ) error { + document, err := docs.GetFromDocument(container, &r.encodedDocReader) + if err != nil { + return fmt.Errorf("unable to decode encoded document; %w", err) + } for _, field := range document.Fields { if err := r.addTermWithLock(field.Name); err != nil { return fmt.Errorf("unable to add document terms [%+v]: %v", document, err) @@ -233,8 +240,12 @@ func (r *aggregatedResults) addTermWithLock( } func (r *aggregatedResults) addDocumentWithLock( - document doc.Metadata, + container doc.Document, ) error { + document, err := docs.GetFromDocument(container, &r.encodedDocReader) + if err != nil { + return fmt.Errorf("unable to decode encoded document; %w", err) + } for _, field := range document.Fields { if err := r.addFieldWithLock(field.Name, field.Value); err != nil { return fmt.Errorf("unable to add document [%+v]: %v", document, err) diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index 4a92a9f900..7ca2954598 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -32,7 +32,7 @@ import ( "github.com/stretchr/testify/require" ) -func genDoc(strs ...string) doc.Metadata { +func genDoc(strs ...string) doc.Document { if len(strs)%2 != 0 { panic("invalid test setup; need even str length") } @@ -45,15 +45,15 @@ func genDoc(strs ...string) doc.Metadata { } } - return doc.Metadata{Fields: fields} + return doc.NewDocumentFromMetadata(doc.Metadata{Fields: fields}) } func TestAggResultsInsertInvalid(t *testing.T) { res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) assert.True(t, res.EnforceLimits()) - dInvalid := doc.Metadata{Fields: []doc.Field{{}}} - size, docsCount, err := res.AddDocuments([]doc.Metadata{dInvalid}) + dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) + size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) require.Error(t, err) require.Equal(t, 0, size) require.Equal(t, 1, docsCount) @@ -62,7 +62,7 @@ func TestAggResultsInsertInvalid(t *testing.T) { require.Equal(t, 1, res.TotalDocsCount()) dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Metadata{dInvalid}) + size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) require.Error(t, err) require.Equal(t, 0, size) require.Equal(t, 2, docsCount) @@ -74,7 +74,7 @@ func TestAggResultsInsertInvalid(t *testing.T) { func TestAggResultsInsertEmptyTermValue(t *testing.T) { res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) dValidEmptyTerm := genDoc("foo", "") - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValidEmptyTerm}) + size, docsCount, err := res.AddDocuments([]doc.Document{dValidEmptyTerm}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -87,7 +87,7 @@ func TestAggResultsInsertBatchOfTwo(t *testing.T) { res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) d1 := genDoc("d1", "") d2 := genDoc("d2", "") - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1, d2}) + size, docsCount, err := res.AddDocuments([]doc.Document{d1, d2}) require.NoError(t, err) require.Equal(t, 2, size) require.Equal(t, 2, docsCount) @@ -100,8 +100,8 @@ func TestAggResultsTermOnlyInsert(t *testing.T) { res := NewAggregateResults(nil, AggregateResultsOptions{ Type: AggregateTagNames, }, testOpts) - dInvalid := doc.Metadata{Fields: []doc.Field{{}}} - size, docsCount, err := res.AddDocuments([]doc.Metadata{dInvalid}) + dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) + size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) require.Error(t, err) require.Equal(t, 0, size) require.Equal(t, 1, docsCount) @@ -110,7 +110,7 @@ func TestAggResultsTermOnlyInsert(t *testing.T) { require.Equal(t, 1, res.TotalDocsCount()) dInvalid = genDoc("", "foo") - size, docsCount, err = res.AddDocuments([]doc.Metadata{dInvalid}) + size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) require.Error(t, err) require.Equal(t, 0, size) require.Equal(t, 2, docsCount) @@ -119,7 +119,7 @@ func TestAggResultsTermOnlyInsert(t *testing.T) { require.Equal(t, 2, res.TotalDocsCount()) valid := genDoc("foo", "") - size, docsCount, err = res.AddDocuments([]doc.Metadata{valid}) + size, docsCount, err = res.AddDocuments([]doc.Document{valid}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 3, docsCount) @@ -130,7 +130,7 @@ func TestAggResultsTermOnlyInsert(t *testing.T) { func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValid}) + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -138,7 +138,7 @@ func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { require.Equal(t, 1, res.Size()) require.Equal(t, 1, res.TotalDocsCount()) - size, docsCount, err = res.AddDocuments([]doc.Metadata{dValid}) + size, docsCount, err = res.AddDocuments([]doc.Document{dValid}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 2, docsCount) @@ -164,7 +164,7 @@ func TestInvalidAggregateType(t *testing.T) { Type: 100, }, testOpts) dValid := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValid}) + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) require.Error(t, err) require.Equal(t, 0, size) require.Equal(t, 1, docsCount) @@ -173,7 +173,7 @@ func TestInvalidAggregateType(t *testing.T) { func TestAggResultsSameName(t *testing.T) { res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1}) + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -185,7 +185,7 @@ func TestAggResultsSameName(t *testing.T) { assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Metadata{d2}) + size, docsCount, err = res.AddDocuments([]doc.Document{d2}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 2, docsCount) @@ -212,7 +212,7 @@ func TestAggResultsTermOnlySameName(t *testing.T) { Type: AggregateTagNames, }, testOpts) d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1}) + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -223,7 +223,7 @@ func TestAggResultsTermOnlySameName(t *testing.T) { assertNoValuesInNameOnlyAggregate(t, aggVals) d2 := genDoc("foo", "biz") - size, docsCount, err = res.AddDocuments([]doc.Metadata{d2}) + size, docsCount, err = res.AddDocuments([]doc.Document{d2}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 2, docsCount) @@ -235,20 +235,20 @@ func TestAggResultsTermOnlySameName(t *testing.T) { } func addMultipleDocuments(t *testing.T, res AggregateResults) (int, int) { - _, _, err := res.AddDocuments([]doc.Metadata{ + _, _, err := res.AddDocuments([]doc.Document{ genDoc("foo", "bar"), genDoc("fizz", "bar"), genDoc("buzz", "bar"), }) require.NoError(t, err) - _, _, err = res.AddDocuments([]doc.Metadata{ + _, _, err = res.AddDocuments([]doc.Document{ genDoc("foo", "biz"), genDoc("fizz", "bar"), }) require.NoError(t, err) - size, docsCount, err := res.AddDocuments([]doc.Metadata{ + size, docsCount, err := res.AddDocuments([]doc.Document{ genDoc("foo", "baz", "buzz", "bag", "qux", "qaz"), }) @@ -376,9 +376,12 @@ func TestAggResultsMergeNameOnly(t *testing.T) { func TestAggResultsInsertCopies(t *testing.T) { res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) dValid := genDoc("foo", "bar") - name := dValid.Fields[0].Name - value := dValid.Fields[0].Value - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValid}) + + d, ok := dValid.Metadata() + require.True(t, ok) + name := d.Fields[0].Name + value := d.Fields[0].Value + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -419,8 +422,10 @@ func TestAggResultsNameOnlyInsertCopies(t *testing.T) { Type: AggregateTagNames, }, testOpts) dValid := genDoc("foo", "bar") - name := dValid.Fields[0].Name - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValid}) + d, ok := dValid.Metadata() + require.True(t, ok) + name := d.Fields[0].Name + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -450,7 +455,7 @@ func TestAggResultsReset(t *testing.T) { res := NewAggregateResults(ident.StringID("qux"), AggregateResultsOptions{}, testOpts) d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1}) + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -500,7 +505,7 @@ func TestAggResultFinalize(t *testing.T) { // Create a Results and insert some data. res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) d1 := genDoc("foo", "bar") - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1}) + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 94892d232f..cbb641e6e5 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -533,9 +533,9 @@ func (b *block) closeAsync(closer io.Closer) { func (b *block) addQueryResults( cancellable *xresource.CancellableLifetime, results BaseResults, - batch []doc.Metadata, + batch []doc.Document, source []byte, -) ([]doc.Metadata, int, int, error) { +) ([]doc.Document, int, int, error) { // update recently queried docs to monitor memory. if results.EnforceLimits() { if err := b.docsLimit.Inc(len(batch), source); err != nil { @@ -557,7 +557,7 @@ func (b *block) addQueryResults( cancellable.ReleaseCheckout() // reset batch. - var emptyDoc doc.Metadata + var emptyDoc doc.Document for i := range batch { batch[i] = emptyDoc } diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index f84914962f..60ea14ec72 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -518,11 +518,11 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) { return exec, nil } - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc1()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(fmt.Errorf("randomerr")), dIter.EXPECT().Close(), @@ -559,11 +559,11 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { return exec, nil } - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc1()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), dIter.EXPECT().Close().Return(nil), @@ -581,8 +581,10 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) - t1, ok := results.Map().Get(ident.StringID(string(testDoc1().ID))) + d, ok := results.Map().Get(testDoc1().ID) require.True(t, ok) + + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) @@ -610,7 +612,7 @@ func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) { return exec, nil } - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(false), @@ -649,11 +651,11 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { return exec, nil } - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc1()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), dIter.EXPECT().Close().Return(nil), @@ -670,8 +672,11 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) - t1, ok := results.Map().Get(ident.StringID(string(testDoc1().ID))) + + d, ok := results.Map().Get(testDoc1().ID) require.True(t, ok) + + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) @@ -699,11 +704,11 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { return exec, nil } - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc1()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), dIter.EXPECT().Close().Return(nil), @@ -722,8 +727,9 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { rMap := results.Map() require.Equal(t, 1, rMap.Len()) - t1, ok := rMap.Get(ident.StringID(string(testDoc1().ID))) + d, ok := rMap.Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) @@ -751,11 +757,11 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { return exec, nil } - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc1()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), dIter.EXPECT().Close().Return(nil), @@ -772,8 +778,9 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) - t1, ok := results.Map().Get(ident.StringID(string(testDoc1().ID))) + d, ok := results.Map().Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) @@ -801,11 +808,11 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { return exec, nil } - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc1()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), dIter.EXPECT().Close().Return(nil), @@ -824,8 +831,9 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { rMap := results.Map() require.Equal(t, 1, rMap.Len()) - t1, ok := rMap.Get(ident.StringID(string(testDoc1().ID))) + d, ok := rMap.Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) @@ -857,10 +865,12 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) { limit := 1 results := NewQueryResults(nil, QueryResultsOptions{SizeLimit: limit}, testOpts) - _, _, err = results.AddDocuments([]doc.Metadata{testDoc1()}) + _, _, err = results.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(testDoc1()), + }) require.NoError(t, err) - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), @@ -878,8 +888,9 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) { rMap := results.Map() require.Equal(t, 1, rMap.Len()) - t1, ok := rMap.Get(ident.StringID(string(testDoc1().ID))) + d, ok := rMap.Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) @@ -908,16 +919,18 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) { } results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - _, _, err = results.AddDocuments([]doc.Metadata{testDoc1()}) + _, _, err = results.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(testDoc1()), + }) require.NoError(t, err) - dIter := doc.NewMockMetadataIterator(ctrl) + dIter := doc.NewMockIterator(ctrl) gomock.InOrder( exec.EXPECT().Execute(gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc1DupeID()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1DupeID())), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Current().Return(testDoc2()), + dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc2())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), dIter.EXPECT().Close().Return(nil), @@ -933,14 +946,16 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) { rMap := results.Map() require.Equal(t, 2, rMap.Len()) - t1, ok := rMap.Get(ident.StringID(string(testDoc1().ID))) + d, ok := rMap.Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) - t2, ok := rMap.Get(ident.StringID(string(testDoc2().ID))) + d, ok = rMap.Get(testDoc2().ID) require.True(t, ok) + t2 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz", "some", "more")).Matches( t2)) @@ -1402,14 +1417,16 @@ func TestBlockE2EInsertQuery(t *testing.T) { require.Equal(t, 2, results.Size()) rMap := results.Map() - t1, ok := rMap.Get(ident.StringID(string(testDoc1().ID))) + d, ok := rMap.Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) - t2, ok := rMap.Get(ident.StringID(string(testDoc2().ID))) + d, ok = rMap.Get(testDoc2().ID) require.True(t, ok) + t2 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz", "some", "more")).Matches( t2)) @@ -1479,17 +1496,19 @@ func TestBlockE2EInsertQueryLimit(t *testing.T) { rMap := results.Map() numFound := 0 - t1, ok := rMap.Get(ident.StringID(string(testDoc1().ID))) + d, ok := rMap.Get(testDoc1().ID) if ok { numFound++ + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) } - t2, ok := rMap.Get(ident.StringID(string(testDoc2().ID))) + d, ok = rMap.Get(testDoc2().ID) if ok { numFound++ + t2 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz", "some", "more")).Matches( t2)) @@ -1567,14 +1586,16 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) { require.Equal(t, 2, results.Size()) rMap := results.Map() - t1, ok := rMap.Get(ident.StringID(string(testDoc1().ID))) + d, ok := rMap.Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) - t2, ok := rMap.Get(ident.StringID(string(testDoc2().ID))) + d, ok = rMap.Get(testDoc2().ID) require.True(t, ok) + t2 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz", "some", "more")).Matches( t2)) @@ -1646,14 +1667,16 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) { require.Equal(t, 2, results.Size()) rMap := results.Map() - t1, ok := results.Map().Get(ident.StringID(string(testDoc1().ID))) + d, ok := results.Map().Get(testDoc1().ID) require.True(t, ok) + t1 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz")).Matches( t1)) - t2, ok := rMap.Get(ident.StringID(string(testDoc2().ID))) + d, ok = rMap.Get(testDoc2().ID) require.True(t, ok) + t2 := documentToTagIter(t, d) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("bar", "baz", "some", "more")).Matches( t2)) diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 41e14ef338..0c44cf9614 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/storage/index/types.go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -129,7 +129,7 @@ func (mr *MockBaseResultsMockRecorder) EnforceLimits() *gomock.Call { } // AddDocuments mocks base method -func (m *MockBaseResults) AddDocuments(batch []doc.Metadata) (int, int, error) { +func (m *MockBaseResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddDocuments", batch) ret0, _ := ret[0].(int) @@ -236,7 +236,7 @@ func (mr *MockQueryResultsMockRecorder) EnforceLimits() *gomock.Call { } // AddDocuments mocks base method -func (m *MockQueryResults) AddDocuments(batch []doc.Metadata) (int, int, error) { +func (m *MockQueryResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddDocuments", batch) ret0, _ := ret[0].(int) @@ -430,7 +430,7 @@ func (mr *MockAggregateResultsMockRecorder) EnforceLimits() *gomock.Call { } // AddDocuments mocks base method -func (m *MockAggregateResults) AddDocuments(batch []doc.Metadata) (int, int, error) { +func (m *MockAggregateResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddDocuments", batch) ret0, _ := ret[0].(int) @@ -1485,6 +1485,34 @@ func (mr *MockOptionsMockRecorder) DocumentArrayPool() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DocumentArrayPool", reflect.TypeOf((*MockOptions)(nil).DocumentArrayPool)) } +// SetMetadataArrayPool mocks base method +func (m *MockOptions) SetMetadataArrayPool(value doc.MetadataArrayPool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetMetadataArrayPool", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetMetadataArrayPool indicates an expected call of SetMetadataArrayPool +func (mr *MockOptionsMockRecorder) SetMetadataArrayPool(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMetadataArrayPool", reflect.TypeOf((*MockOptions)(nil).SetMetadataArrayPool), value) +} + +// MetadataArrayPool mocks base method +func (m *MockOptions) MetadataArrayPool() doc.MetadataArrayPool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MetadataArrayPool") + ret0, _ := ret[0].(doc.MetadataArrayPool) + return ret0 +} + +// MetadataArrayPool indicates an expected call of MetadataArrayPool +func (mr *MockOptionsMockRecorder) MetadataArrayPool() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MetadataArrayPool", reflect.TypeOf((*MockOptions)(nil).MetadataArrayPool)) +} + // SetAggregateResultsEntryArrayPool mocks base method func (m *MockOptions) SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 9e8a288624..055ab95cac 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -362,7 +362,7 @@ func (o *opts) DocumentArrayPool() doc.DocumentArrayPool { } func (o *opts) SetMetadataArrayPool(value doc.MetadataArrayPool) Options { - opts := *o + opts := *o // nolint:govet opts.metadataArrayPool = value return &opts } diff --git a/src/dbnode/storage/index/results.go b/src/dbnode/storage/index/results.go index c4d09520e1..f22f8fa3b9 100644 --- a/src/dbnode/storage/index/results.go +++ b/src/dbnode/storage/index/results.go @@ -24,8 +24,8 @@ import ( "errors" "sync" - "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" ) @@ -84,19 +84,11 @@ func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) { nsID = r.idPool.Clone(nsID) } r.nsID = nsID - // Reset all values from map first if they are present. - for _, entry := range r.resultsMap.Iter() { - tags := entry.Value() - tags.Close() - } // Reset all keys in the map next, this will finalize the keys. r.resultsMap.Reset() r.totalDocsCount = 0 - // NB: could do keys+value in one step but I'm trying to avoid - // using an internal method of a code-gen'd type. - r.opts = opts r.Unlock() @@ -104,7 +96,7 @@ func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) { // NB: If documents with duplicate IDs are added, they are simply ignored and // the first document added with an ID is returned. -func (r *results) AddDocuments(batch []doc.Metadata) (int, int, error) { +func (r *results) AddDocuments(batch []doc.Document) (int, int, error) { r.Lock() err := r.addDocumentsBatchWithLock(batch) size := r.resultsMap.Len() @@ -114,7 +106,7 @@ func (r *results) AddDocuments(batch []doc.Metadata) (int, int, error) { return size, docsCount, err } -func (r *results) addDocumentsBatchWithLock(batch []doc.Metadata) error { +func (r *results) addDocumentsBatchWithLock(batch []doc.Document) error { for i := range batch { _, size, err := r.addDocumentWithLock(batch[i]) if err != nil { @@ -128,29 +120,29 @@ func (r *results) addDocumentsBatchWithLock(batch []doc.Metadata) error { return nil } -func (r *results) addDocumentWithLock(d doc.Metadata) (bool, int, error) { - if len(d.ID) == 0 { - return false, r.resultsMap.Len(), errUnableToAddResultMissingID +func (r *results) addDocumentWithLock(w doc.Document) (bool, int, error) { + id, err := docs.ReadIDFromDocument(w) + if err != nil { + return false, r.resultsMap.Len(), err } - // NB: can cast the []byte -> ident.ID to avoid an alloc - // before we're sure we need it. - tsID := ident.BytesID(d.ID) + if len(id) == 0 { + return false, r.resultsMap.Len(), errUnableToAddResultMissingID + } // Need to apply filter if set first. - if r.opts.FilterID != nil && !r.opts.FilterID(tsID) { + if r.opts.FilterID != nil && !r.opts.FilterID(ident.BytesID(id)) { return false, r.resultsMap.Len(), nil } // check if it already exists in the map. - if r.resultsMap.Contains(tsID) { + if r.resultsMap.Contains(id) { return false, r.resultsMap.Len(), nil } - tags := convert.ToSeriesTags(d, convert.Opts{NoClone: true}) // It is assumed that the document is valid for the lifetime of the index // results. - r.resultsMap.SetUnsafe(tsID, tags, resultMapNoFinalizeOpts) + r.resultsMap.SetUnsafe(id, w, resultMapNoFinalizeOpts) return true, r.resultsMap.Len(), nil } diff --git a/src/dbnode/storage/index/results_map_gen.go b/src/dbnode/storage/index/results_map_gen.go index 9ef643dc5d..748acfbe3b 100644 --- a/src/dbnode/storage/index/results_map_gen.go +++ b/src/dbnode/storage/index/results_map_gen.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -25,9 +25,33 @@ package index import ( - "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/m3ninx/doc" ) +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// This file was automatically generated by genny. +// Any changes will be lost if this file is regenerated. +// see https://github.com/mauricelam/genny + // Copyright (c) 2018 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -53,16 +77,16 @@ import ( type ResultsMapHash uint64 // ResultsMapHashFn is the hash function to execute when hashing a key. -type ResultsMapHashFn func(ident.ID) ResultsMapHash +type ResultsMapHashFn func([]byte) ResultsMapHash // ResultsMapEqualsFn is the equals key function to execute when detecting equality of a key. -type ResultsMapEqualsFn func(ident.ID, ident.ID) bool +type ResultsMapEqualsFn func([]byte, []byte) bool // ResultsMapCopyFn is the copy key function to execute when copying the key. -type ResultsMapCopyFn func(ident.ID) ident.ID +type ResultsMapCopyFn func([]byte) []byte // ResultsMapFinalizeFn is the finalize key function to execute when finished with a key. -type ResultsMapFinalizeFn func(ident.ID) +type ResultsMapFinalizeFn func([]byte) // ResultsMap uses the genny package to provide a generic hash map that can be specialized // by running the following command from this root of the repository: @@ -115,21 +139,21 @@ type ResultsMapEntry struct { // key is used to check equality on lookups to resolve collisions key _ResultsMapKey // value type stored - value ident.TagIterator + value doc.Document } type _ResultsMapKey struct { - key ident.ID + key []byte finalize bool } // Key returns the map entry key. -func (e ResultsMapEntry) Key() ident.ID { +func (e ResultsMapEntry) Key() []byte { return e.key.key } // Value returns the map entry value. -func (e ResultsMapEntry) Value() ident.TagIterator { +func (e ResultsMapEntry) Value() doc.Document { return e.value } @@ -143,7 +167,7 @@ func _ResultsMapAlloc(opts _ResultsMapOptions) *ResultsMap { return m } -func (m *ResultsMap) newMapKey(k ident.ID, opts _ResultsMapKeyOptions) _ResultsMapKey { +func (m *ResultsMap) newMapKey(k []byte, opts _ResultsMapKeyOptions) _ResultsMapKey { key := _ResultsMapKey{key: k, finalize: opts.finalizeKey} if !opts.copyKey { return key @@ -161,7 +185,7 @@ func (m *ResultsMap) removeMapKey(hash ResultsMapHash, key _ResultsMapKey) { } // Get returns a value in the map for an identifier if found. -func (m *ResultsMap) Get(k ident.ID) (ident.TagIterator, bool) { +func (m *ResultsMap) Get(k []byte) (doc.Document, bool) { hash := m.hash(k) for entry, ok := m.lookup[hash]; ok; entry, ok = m.lookup[hash] { if m.equals(entry.key.key, k) { @@ -170,12 +194,12 @@ func (m *ResultsMap) Get(k ident.ID) (ident.TagIterator, bool) { // Linear probe to "next" to this entry (really a rehash) hash++ } - var empty ident.TagIterator + var empty doc.Document return empty, false } // Set will set the value for an identifier. -func (m *ResultsMap) Set(k ident.ID, v ident.TagIterator) { +func (m *ResultsMap) Set(k []byte, v doc.Document) { m.set(k, v, _ResultsMapKeyOptions{ copyKey: true, finalizeKey: m.finalize != nil, @@ -191,7 +215,7 @@ type ResultsMapSetUnsafeOptions struct { // SetUnsafe will set the value for an identifier with unsafe options for how // the map treats the key. -func (m *ResultsMap) SetUnsafe(k ident.ID, v ident.TagIterator, opts ResultsMapSetUnsafeOptions) { +func (m *ResultsMap) SetUnsafe(k []byte, v doc.Document, opts ResultsMapSetUnsafeOptions) { m.set(k, v, _ResultsMapKeyOptions{ copyKey: !opts.NoCopyKey, finalizeKey: !opts.NoFinalizeKey, @@ -203,7 +227,7 @@ type _ResultsMapKeyOptions struct { finalizeKey bool } -func (m *ResultsMap) set(k ident.ID, v ident.TagIterator, opts _ResultsMapKeyOptions) { +func (m *ResultsMap) set(k []byte, v doc.Document, opts _ResultsMapKeyOptions) { hash := m.hash(k) for entry, ok := m.lookup[hash]; ok; entry, ok = m.lookup[hash] { if m.equals(entry.key.key, k) { @@ -237,13 +261,13 @@ func (m *ResultsMap) Len() int { // Contains returns true if value exists for key, false otherwise, it is // shorthand for a call to Get that doesn't return the value. -func (m *ResultsMap) Contains(k ident.ID) bool { +func (m *ResultsMap) Contains(k []byte) bool { _, ok := m.Get(k) return ok } // Delete will remove a value set in the map for the specified key. -func (m *ResultsMap) Delete(k ident.ID) { +func (m *ResultsMap) Delete(k []byte) { hash := m.hash(k) for entry, ok := m.lookup[hash]; ok; entry, ok = m.lookup[hash] { if m.equals(entry.key.key, k) { diff --git a/src/dbnode/storage/index/results_new_map.go b/src/dbnode/storage/index/results_new_map.go index 6620400594..7d702d548c 100644 --- a/src/dbnode/storage/index/results_new_map.go +++ b/src/dbnode/storage/index/results_new_map.go @@ -21,9 +21,11 @@ package index import ( - "github.com/m3db/m3/src/x/ident" + "bytes" "github.com/cespare/xxhash/v2" + + "github.com/m3db/m3/src/x/ident" ) const ( @@ -32,17 +34,16 @@ const ( func newResultsMap(idPool ident.Pool) *ResultsMap { return _ResultsMapAlloc(_ResultsMapOptions{ - hash: func(k ident.ID) ResultsMapHash { - return ResultsMapHash(xxhash.Sum64(k.Bytes())) - }, - equals: func(x, y ident.ID) bool { - return x.Equal(y) + hash: func(k []byte) ResultsMapHash { + return ResultsMapHash(xxhash.Sum64(k)) }, - copy: func(k ident.ID) ident.ID { - return idPool.Clone(k) + equals: bytes.Equal, + // TODO(nate); are these copy and finalize implementations right? + copy: func(k []byte) []byte { + return idPool.Clone(ident.BytesID(k)).Bytes() }, - finalize: func(k ident.ID) { - k.Finalize() + finalize: func(k []byte) { + // NB(nate): no-op for bytes IDs }, initialSize: defaultInitialResultsMapSize, }) diff --git a/src/dbnode/storage/index/results_test.go b/src/dbnode/storage/index/results_test.go index 3dc91b5947..41d7b4beaf 100644 --- a/src/dbnode/storage/index/results_test.go +++ b/src/dbnode/storage/index/results_test.go @@ -24,7 +24,9 @@ import ( "bytes" "testing" + idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" xtest "github.com/m3db/m3/src/x/test" @@ -59,7 +61,9 @@ func TestResultsInsertInvalid(t *testing.T) { assert.True(t, res.EnforceLimits()) dInvalid := doc.Metadata{ID: nil} - size, docsCount, err := res.AddDocuments([]doc.Metadata{dInvalid}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(dInvalid), + }) require.Error(t, err) require.Equal(t, 0, size) require.Equal(t, 1, docsCount) @@ -71,7 +75,9 @@ func TestResultsInsertInvalid(t *testing.T) { func TestResultsInsertIdempotency(t *testing.T) { res := NewQueryResults(nil, QueryResultsOptions{}, testOpts) dValid := doc.Metadata{ID: []byte("abc")} - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValid}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(dValid), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -79,7 +85,9 @@ func TestResultsInsertIdempotency(t *testing.T) { require.Equal(t, 1, res.Size()) require.Equal(t, 1, res.TotalDocsCount()) - size, docsCount, err = res.AddDocuments([]doc.Metadata{dValid}) + size, docsCount, err = res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(dValid), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 2, docsCount) @@ -92,7 +100,10 @@ func TestResultsInsertBatchOfTwo(t *testing.T) { res := NewQueryResults(nil, QueryResultsOptions{}, testOpts) d1 := doc.Metadata{ID: []byte("d1")} d2 := doc.Metadata{ID: []byte("d2")} - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1, d2}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(d1), + doc.NewDocumentFromMetadata(d2), + }) require.NoError(t, err) require.Equal(t, 2, size) require.Equal(t, 2, docsCount) @@ -104,7 +115,9 @@ func TestResultsInsertBatchOfTwo(t *testing.T) { func TestResultsFirstInsertWins(t *testing.T) { res := NewQueryResults(nil, QueryResultsOptions{}, testOpts) d1 := doc.Metadata{ID: []byte("abc")} - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(d1), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -112,8 +125,10 @@ func TestResultsFirstInsertWins(t *testing.T) { require.Equal(t, 1, res.Size()) require.Equal(t, 1, res.TotalDocsCount()) - tags, ok := res.Map().Get(ident.StringID("abc")) + d, ok := res.Map().Get(d1.ID) require.True(t, ok) + + tags := documentToTagIter(t, d) require.Equal(t, 0, tags.Remaining()) d2 := doc.Metadata{ @@ -124,7 +139,9 @@ func TestResultsFirstInsertWins(t *testing.T) { Value: []byte("bar"), }, }} - size, docsCount, err = res.AddDocuments([]doc.Metadata{d2}) + size, docsCount, err = res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(d2), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 2, docsCount) @@ -132,21 +149,27 @@ func TestResultsFirstInsertWins(t *testing.T) { require.Equal(t, 1, res.Size()) require.Equal(t, 2, res.TotalDocsCount()) - tags, ok = res.Map().Get(ident.StringID("abc")) + d, ok = res.Map().Get([]byte("abc")) require.True(t, ok) + + tags = documentToTagIter(t, d) require.Equal(t, 0, tags.Remaining()) } func TestResultsInsertContains(t *testing.T) { res := NewQueryResults(nil, QueryResultsOptions{}, testOpts) dValid := doc.Metadata{ID: []byte("abc")} - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValid}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(dValid), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) - tags, ok := res.Map().Get(ident.StringID("abc")) + d, ok := res.Map().Get([]byte("abc")) require.True(t, ok) + + tags := documentToTagIter(t, d) require.Equal(t, 0, tags.Remaining()) } @@ -155,7 +178,9 @@ func TestResultsInsertDoesNotCopy(t *testing.T) { dValid := doc.Metadata{ID: []byte("abc"), Fields: []doc.Field{ {Name: []byte("name"), Value: []byte("value")}, }} - size, docsCount, err := res.AddDocuments([]doc.Metadata{dValid}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(dValid), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) @@ -165,9 +190,10 @@ func TestResultsInsertDoesNotCopy(t *testing.T) { // Our genny generated maps don't provide access to MapEntry directly, // so we iterate over the map to find the added entry. Could avoid this // in the future if we expose `func (m *Map) Entry(k Key) Entry {}`. + reader := docs.NewEncodedDocumentReader() for _, entry := range res.Map().Iter() { // see if this key has the same value as the added document's ID. - key := entry.Key().Bytes() + key := entry.Key() if !bytes.Equal(dValid.ID, key) { continue } @@ -175,7 +201,11 @@ func TestResultsInsertDoesNotCopy(t *testing.T) { // Ensure the underlying []byte for ID/Fields is the same. require.True(t, xtest.ByteSlicesBackedBySameData(key, dValid.ID)) - tags := entry.Value() + d := entry.Value() + m, err := docs.GetFromDocument(d, reader) + require.NoError(t, err) + + tags := idxconvert.ToSeriesTags(m, idxconvert.Opts{NoClone: true}) for _, f := range dValid.Fields { fName := f.Name fValue := f.Value @@ -200,17 +230,21 @@ func TestResultsInsertDoesNotCopy(t *testing.T) { func TestResultsReset(t *testing.T) { res := NewQueryResults(nil, QueryResultsOptions{}, testOpts) d1 := doc.Metadata{ID: []byte("abc")} - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(d1), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) - tags, ok := res.Map().Get(ident.StringID("abc")) + d, ok := res.Map().Get([]byte("abc")) require.True(t, ok) + + tags := documentToTagIter(t, d) require.Equal(t, 0, tags.Remaining()) res.Reset(nil, QueryResultsOptions{}) - _, ok = res.Map().Get(ident.StringID("abc")) + _, ok = res.Map().Get([]byte("abc")) require.False(t, ok) require.Equal(t, 0, tags.Remaining()) require.Equal(t, 0, res.Size()) @@ -234,29 +268,34 @@ func TestFinalize(t *testing.T) { // Create a Results and insert some data. res := NewQueryResults(nil, QueryResultsOptions{}, testOpts) d1 := doc.Metadata{ID: []byte("abc")} - size, docsCount, err := res.AddDocuments([]doc.Metadata{d1}) + size, docsCount, err := res.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(d1), + }) require.NoError(t, err) require.Equal(t, 1, size) require.Equal(t, 1, docsCount) // Ensure the data is present. - tags, ok := res.Map().Get(ident.StringID("abc")) + d, ok := res.Map().Get([]byte("abc")) require.True(t, ok) + + tags := documentToTagIter(t, d) require.Equal(t, 0, tags.Remaining()) // Call Finalize() to reset the Results. res.Finalize() // Ensure data was removed by call to Finalize(). - tags, ok = res.Map().Get(ident.StringID("abc")) + _, ok = res.Map().Get([]byte("abc")) require.False(t, ok) require.Equal(t, 0, res.Size()) require.Equal(t, 0, res.TotalDocsCount()) +} - for _, entry := range res.Map().Iter() { - id, _ := entry.Key(), entry.Value() - require.False(t, id.IsNoFinalize()) - // TODO(rartoul): Could verify tags are NoFinalize() as well if - // they had that method. - } +func documentToTagIter(t *testing.T, doc doc.Document) ident.TagIterator { + reader := docs.NewEncodedDocumentReader() + m, err := docs.GetFromDocument(doc, reader) + require.NoError(t, err) + + return idxconvert.ToSeriesTags(m, idxconvert.Opts{NoClone: true}) } diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 12ceca4859..9e5e07ec07 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -164,7 +164,7 @@ type BaseResults interface { // modified after this function returns without affecting the results map. // TODO(r): We will need to change this behavior once index fields are // mutable and the most recent need to shadow older entries. - AddDocuments(batch []doc.Metadata) (size, docsCount int, err error) + AddDocuments(batch []doc.Document) (size, docsCount int, err error) // Finalize releases any resources held by the Results object, // including returning it to a backing pool. diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index 4cf8a0011e..af6b707584 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -22,9 +22,11 @@ package index import ( "errors" + "fmt" "sync" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" ) @@ -94,7 +96,7 @@ func (r *wideResults) EnforceLimits() bool { return false } -func (r *wideResults) AddDocuments(batch []doc.Metadata) (int, int, error) { +func (r *wideResults) AddDocuments(batch []doc.Document) (int, int, error) { var size, totalDocsCount int r.RLock() size, totalDocsCount = r.size, r.totalDocsCount @@ -124,7 +126,7 @@ func (r *wideResults) AddDocuments(batch []doc.Metadata) (int, int, error) { return size, totalDocsCount, err } -func (r *wideResults) addDocumentsBatchWithLock(batch []doc.Metadata) error { +func (r *wideResults) addDocumentsBatchWithLock(batch []doc.Document) error { for i := range batch { if err := r.addDocumentWithLock(batch[i]); err != nil { return err @@ -134,12 +136,16 @@ func (r *wideResults) addDocumentsBatchWithLock(batch []doc.Metadata) error { return nil } -func (r *wideResults) addDocumentWithLock(d doc.Metadata) error { - if len(d.ID) == 0 { +func (r *wideResults) addDocumentWithLock(w doc.Document) error { + docID, err := docs.ReadIDFromDocument(w) + if err != nil { + return fmt.Errorf("unable to decode document ID: %w", err) + } + if len(docID) == 0 { return errUnableToAddResultMissingID } - var tsID ident.ID = ident.BytesID(d.ID) + var tsID ident.ID = ident.BytesID(docID) documentShard, documentShardOwned := r.shardFilter(tsID) if !documentShardOwned { diff --git a/src/dbnode/storage/index/wide_query_results_test.go b/src/dbnode/storage/index/wide_query_results_test.go index 2f57a729ee..04539ddfed 100644 --- a/src/dbnode/storage/index/wide_query_results_test.go +++ b/src/dbnode/storage/index/wide_query_results_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/m3db/m3/src/m3ninx/doc" + encoding "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" @@ -52,18 +53,19 @@ func init() { bytesPool.Init() } -func buildDocs(documentCount int, batchSize int) [][]doc.Metadata { +func buildDocs(documentCount int, batchSize int) [][]doc.Document { docBatches := int(math.Ceil(float64(documentCount) / float64(batchSize))) - docs := make([][]doc.Metadata, 0, docBatches) + docs := make([][]doc.Document, 0, docBatches) for i := 0; i < docBatches; i++ { - batch := make([]doc.Metadata, 0, batchSize) + batch := make([]doc.Document, 0, batchSize) for j := 0; j < batchSize; j++ { val := i*batchSize + j if val < documentCount { val := fmt.Sprintf("foo%d", i*batchSize+j) - batch = append(batch, doc.Metadata{ - ID: []byte(val), - }) + batch = append(batch, doc.NewDocumentFromMetadata( + doc.Metadata{ + ID: []byte(val), + })) } } @@ -73,12 +75,15 @@ func buildDocs(documentCount int, batchSize int) [][]doc.Metadata { return docs } -func buildExpected(_ *testing.T, docs [][]doc.Metadata) [][]string { +func buildExpected(t *testing.T, docs [][]doc.Document) [][]string { expected := make([][]string, 0, len(docs)) + reader := encoding.NewEncodedDocumentReader() for _, batch := range docs { idBatch := make([]string, 0, len(batch)) - for _, doc := range batch { - idBatch = append(idBatch, string(doc.ID)) + for _, document := range batch { // nolint:gocritic + m, err := encoding.GetFromDocument(document, reader) + require.NoError(t, err) + idBatch = append(idBatch, string(m.ID)) } expected = append(expected, idBatch) diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index 53bade47c0..a4cbadcdfa 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -831,10 +831,13 @@ func TestLimits(t *testing.T) { opts interface{}, results index.BaseResults, logFields interface{}) (bool, error) { - _, _, err = results.AddDocuments([]doc.Metadata{ + _, _, err = results.AddDocuments([]doc.Document{ // Results in size=1 and docs=2. - {ID: []byte("A")}, - {ID: []byte("A")}, + // Byte array represents ID encoded as bytes. + // 1 represents the ID length in bytes, 49 is the ID itself which is + // the ASCII value for A + doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("A")}), + doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("A")}), }) require.NoError(t, err) return false, nil diff --git a/src/dbnode/storage/index_queue_forward_write_test.go b/src/dbnode/storage/index_queue_forward_write_test.go index 05ddd534e4..265b5ac7fe 100644 --- a/src/dbnode/storage/index_queue_forward_write_test.go +++ b/src/dbnode/storage/index_queue_forward_write_test.go @@ -29,11 +29,13 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/index" + idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts/writes" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" "github.com/m3db/m3/src/m3ninx/doc" m3ninxidx "github.com/m3db/m3/src/m3ninx/idx" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -143,6 +145,7 @@ func TestNamespaceForwardIndexInsertQuery(t *testing.T) { // write was correctly indexed to both. nextBlockTime := now.Add(blockSize) queryTimes := []time.Time{now, nextBlockTime} + reader := docs.NewEncodedDocumentReader() for _, ts := range queryTimes { res, err := idx.Query(ctx, index.Query{Query: reQuery}, index.QueryOptions{ StartInclusive: ts.Add(-1 * time.Minute), @@ -154,7 +157,11 @@ func TestNamespaceForwardIndexInsertQuery(t *testing.T) { results := res.Results require.Equal(t, "testns1", results.Namespace().String()) - tags, ok := results.Map().Get(ident.StringID("foo")) + d, ok := results.Map().Get(ident.BytesID("foo")) + md, err := docs.GetFromDocument(d, reader) + require.NoError(t, err) + tags := idxconvert.ToSeriesTags(md, idxconvert.Opts{NoClone: true}) + require.True(t, ok) require.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("name", "value")).Matches( diff --git a/src/dbnode/storage/index_queue_test.go b/src/dbnode/storage/index_queue_test.go index 64d1e6bf1f..833b8d7fc1 100644 --- a/src/dbnode/storage/index_queue_test.go +++ b/src/dbnode/storage/index_queue_test.go @@ -29,8 +29,10 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" m3dberrors "github.com/m3db/m3/src/dbnode/storage/errors" "github.com/m3db/m3/src/dbnode/storage/index" + idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/m3ninx/doc" m3ninxidx "github.com/m3db/m3/src/m3ninx/idx" + "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -347,7 +349,12 @@ func TestNamespaceIndexInsertQuery(t *testing.T) { results := res.Results assert.Equal(t, "testns1", results.Namespace().String()) - tags, ok := results.Map().Get(ident.StringID("foo")) + reader := docs.NewEncodedDocumentReader() + d, ok := results.Map().Get(ident.BytesID("foo")) + md, err := docs.GetFromDocument(d, reader) + require.NoError(t, err) + tags := idxconvert.ToSeriesTags(md, idxconvert.Opts{NoClone: true}) + assert.True(t, ok) assert.True(t, ident.NewTagIterMatcher( ident.MustNewTagStringsIterator("name", "value")).Matches( diff --git a/src/m3ninx/search/proptest/concurrent_test.go b/src/m3ninx/search/proptest/concurrent_test.go index 9d728ab3a1..02009512c8 100644 --- a/src/m3ninx/search/proptest/concurrent_test.go +++ b/src/m3ninx/search/proptest/concurrent_test.go @@ -61,7 +61,7 @@ func TestConcurrentQueries(t *testing.T) { require.NoError(t, err) matchedDocs, err := collectDocs(dOrg) require.NoError(t, err) - docMatcher, err := newDocumentIteratorMatcher(matchedDocs...) + docMatcher, err := newDocumentIteratorMatcher(t, matchedDocs...) require.NoError(t, err) var ( diff --git a/src/m3ninx/search/proptest/issue865_test.go b/src/m3ninx/search/proptest/issue865_test.go index 3a257a50e3..96ce91f00e 100644 --- a/src/m3ninx/search/proptest/issue865_test.go +++ b/src/m3ninx/search/proptest/issue865_test.go @@ -45,35 +45,35 @@ var ( doc1 = doc.Metadata{ ID: []byte("__name__=node_cpu_seconds_total,cpu=1,instance=m3db-node01:9100,job=node-exporter,mode=system,"), Fields: []doc.Field{ - doc.Field{[]byte("cpu"), []byte("1")}, - doc.Field{[]byte("__name__"), []byte("node_cpu_seconds_total")}, - doc.Field{[]byte("instance"), []byte("m3db-node01:9100")}, - doc.Field{[]byte("job"), []byte("node-exporter")}, - doc.Field{[]byte("mode"), []byte("system")}, + {[]byte("cpu"), []byte("1")}, + {[]byte("__name__"), []byte("node_cpu_seconds_total")}, + {[]byte("instance"), []byte("m3db-node01:9100")}, + {[]byte("job"), []byte("node-exporter")}, + {[]byte("mode"), []byte("system")}, }, } doc2 = doc.Metadata{ ID: []byte("__name__=node_memory_SwapTotal_bytes,instance=m3db-node01:9100,job=node-exporter,"), Fields: []doc.Field{ - doc.Field{[]byte("__name__"), []byte("node_memory_SwapTotal_bytes")}, - doc.Field{[]byte("instance"), []byte("m3db-node01:9100")}, - doc.Field{[]byte("job"), []byte("node-exporter")}, + {[]byte("__name__"), []byte("node_memory_SwapTotal_bytes")}, + {[]byte("instance"), []byte("m3db-node01:9100")}, + {[]byte("job"), []byte("node-exporter")}, }, } doc3 = doc.Metadata{ ID: []byte("__name__=node_memory_SwapTotal_bytes,instance=alertmanager03:9100,job=node-exporter,"), Fields: []doc.Field{ - doc.Field{[]byte("__name__"), []byte("node_memory_SwapTotal_bytes")}, - doc.Field{[]byte("instance"), []byte("alertmanager03:9100")}, - doc.Field{[]byte("job"), []byte("node-exporter")}, + {[]byte("__name__"), []byte("node_memory_SwapTotal_bytes")}, + {[]byte("instance"), []byte("alertmanager03:9100")}, + {[]byte("job"), []byte("node-exporter")}, }, } doc4 = doc.Metadata{ ID: []byte("__name__=node_memory_SwapTotal_bytes,instance=prometheus01:9100,job=node-exporter,"), Fields: []doc.Field{ - doc.Field{[]byte("__name__"), []byte("node_memory_SwapTotal_bytes")}, - doc.Field{[]byte("instance"), []byte("prometheus01:9100")}, - doc.Field{[]byte("job"), []byte("node-exporter")}, + {[]byte("__name__"), []byte("node_memory_SwapTotal_bytes")}, + {[]byte("instance"), []byte("prometheus01:9100")}, + {[]byte("job"), []byte("node-exporter")}, }, } simpleTestDocs = []doc.Metadata{doc1, doc2, doc3, doc4} @@ -87,7 +87,7 @@ func TestAnyDistributionOfDocsDoesNotAffectQuery(t *testing.T) { parameters.Rng = rand.New(rand.NewSource(seed)) properties := gopter.NewProperties(parameters) - docMatcher, err := newDocumentIteratorMatcher(doc2) + docMatcher, err := newDocumentIteratorMatcher(t, doc2) require.NoError(t, err) properties.Property("Any distribution of simple documents does not affect query results", prop.ForAll( func(i propTestInput) (bool, error) { diff --git a/src/m3ninx/search/proptest/prop_test.go b/src/m3ninx/search/proptest/prop_test.go index 0ecc89440e..a58a34d843 100644 --- a/src/m3ninx/search/proptest/prop_test.go +++ b/src/m3ninx/search/proptest/prop_test.go @@ -58,7 +58,7 @@ func TestSegmentDistributionDoesNotAffectQuery(t *testing.T) { } matchedDocs, err := collectDocs(dOrg) require.NoError(t, err) - docMatcher, err := newDocumentIteratorMatcher(matchedDocs...) + docMatcher, err := newDocumentIteratorMatcher(t, matchedDocs...) require.NoError(t, err) segments := i.generate(t, lotsTestDocuments) @@ -115,7 +115,7 @@ func TestFSTSimpleSegmentsQueryTheSame(t *testing.T) { } matchedDocs, err := collectDocs(dOrg) require.NoError(t, err) - docMatcher, err := newDocumentIteratorMatcher(matchedDocs...) + docMatcher, err := newDocumentIteratorMatcher(t, matchedDocs...) require.NoError(t, err) rFst, err := fstSeg.Reader() diff --git a/src/m3ninx/search/proptest/segment_gen.go b/src/m3ninx/search/proptest/segment_gen.go index aa3bf83c4e..2b62319c6f 100644 --- a/src/m3ninx/search/proptest/segment_gen.go +++ b/src/m3ninx/search/proptest/segment_gen.go @@ -40,8 +40,8 @@ var ( fstOptions = fst.NewOptions() ) -func collectDocs(iter doc.MetadataIterator) ([]doc.Metadata, error) { - var docs []doc.Metadata +func collectDocs(iter doc.Iterator) ([]doc.Document, error) { + var docs []doc.Document for iter.Next() { docs = append(docs, iter.Current()) } diff --git a/src/m3ninx/search/proptest/util.go b/src/m3ninx/search/proptest/util.go index 60dd08e341..2f1b96bbf2 100644 --- a/src/m3ninx/search/proptest/util.go +++ b/src/m3ninx/search/proptest/util.go @@ -22,40 +22,52 @@ package proptest import ( "fmt" + "testing" + + "github.com/stretchr/testify/require" "github.com/m3db/m3/src/m3ninx/doc" + idxdocs "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" ) type documentIteratorMatcher struct { - expectedDocs map[string]doc.Metadata + expectedDocs map[string]doc.Document + t *testing.T } -func newDocumentIteratorMatcher(docs ...doc.Metadata) (*documentIteratorMatcher, error) { - docMap := make(map[string]doc.Metadata, len(docs)) +func newDocumentIteratorMatcher(t *testing.T, docs ...doc.Document) (*documentIteratorMatcher, error) { + docMap := make(map[string]doc.Document, len(docs)) for _, d := range docs { - id := string(d.ID) + rawID, err := idxdocs.ReadIDFromDocument(d) + id := string(rawID) + require.NoError(t, err) if _, ok := docMap[id]; ok { return nil, fmt.Errorf("received document with duplicate id: %v", d) } docMap[id] = d } - return &documentIteratorMatcher{docMap}, nil + return &documentIteratorMatcher{ + expectedDocs: docMap, + t: t, + }, nil } // Matches returns whether the provided iterator matches the collection of provided docs. -func (m *documentIteratorMatcher) Matches(i doc.MetadataIterator) error { - pendingDocIDs := make(map[string]doc.Metadata, len(m.expectedDocs)) +func (m *documentIteratorMatcher) Matches(i doc.Iterator) error { + pendingDocIDs := make(map[string]doc.Document, len(m.expectedDocs)) for id := range m.expectedDocs { pendingDocIDs[id] = m.expectedDocs[id] } for i.Next() { d := i.Current() - id := string(d.ID) + rawID, err := idxdocs.ReadIDFromDocument(d) + require.NoError(m.t, err) + id := string(rawID) expectedDoc, ok := m.expectedDocs[id] if !ok { return fmt.Errorf("received un-expected document: %+v", d) } - if !expectedDoc.Equal(d) { + if !m.compareDocs(expectedDoc, d) { return fmt.Errorf("received document: %+v did not match expected doc %+v", d, expectedDoc) } delete(pendingDocIDs, id) @@ -71,3 +83,14 @@ func (m *documentIteratorMatcher) Matches(i doc.MetadataIterator) error { } return nil } + +func (m *documentIteratorMatcher) compareDocs(d1 doc.Document, d2 doc.Document) bool { + docReader := idxdocs.NewEncodedDocumentReader() + d1Metadata, err := idxdocs.GetFromDocument(d1, docReader) + require.NoError(m.t, err) + + d2Metadata, err := idxdocs.GetFromDocument(d2, docReader) + require.NoError(m.t, err) + + return d1Metadata.Equal(d2Metadata) +}