diff --git a/src/dbnode/integration/fetch_tagged_quorum_test.go b/src/dbnode/integration/fetch_tagged_quorum_test.go index 9ffa28c7d0..79078717e4 100644 --- a/src/dbnode/integration/fetch_tagged_quorum_test.go +++ b/src/dbnode/integration/fetch_tagged_quorum_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/x/context" @@ -317,7 +318,7 @@ func writeTagged( defer ctx.BlockingClose() for _, n := range nodes { require.NoError(t, n.DB().WriteTagged(ctx, testNamespaces[0], ident.StringID("quorumTest"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("foo", "bar"), ident.StringTag("boo", "baz"))), + convert.NewTagsMetadataResolver(ident.NewTags(ident.StringTag("foo", "bar"), ident.StringTag("boo", "baz"))), n.NowFn()(), 42, xtime.Second, nil)) } } diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index f8596675f7..1340e2b1ed 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -249,7 +249,6 @@ func (s *serviceState) DecNumOutstandingReadRPCs() { type pools struct { id ident.Pool tagEncoder serialize.TagEncoderPool - tagDecoder serialize.TagDecoderPool checkedBytesWrapper xpool.CheckedBytesWrapperPool segmentsArray segmentsArrayPool writeBatchPooledReqPool *writeBatchPooledReqPool @@ -321,7 +320,7 @@ func NewService(db storage.Database, opts tchannelthrift.Options) Service { writeBatchPoolSize = maxWriteReqs } writeBatchPooledReqPool := newWriteBatchPooledReqPool(writeBatchPoolSize, iopts) - writeBatchPooledReqPool.Init(opts.TagDecoderPool()) + writeBatchPooledReqPool.Init() return &service{ state: serviceState{ @@ -343,7 +342,6 @@ func NewService(db storage.Database, opts tchannelthrift.Options) Service { id: opts.IdentifierPool(), checkedBytesWrapper: opts.CheckedBytesWrapperPool(), tagEncoder: opts.TagEncoderPool(), - tagDecoder: opts.TagDecoderPool(), segmentsArray: segmentPool, writeBatchPooledReqPool: writeBatchPooledReqPool, blockMetadataV2: opts.BlockMetadataV2Pool(), @@ -1701,7 +1699,7 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) if err = db.WriteTagged(ctx, s.pools.id.GetStringID(ctx, req.NameSpace), s.pools.id.GetStringID(ctx, req.ID), - iter, xtime.FromNormalizedTime(dp.Timestamp, d), + idxconvert.NewTagsIterMetadataResolver(iter), xtime.FromNormalizedTime(dp.Timestamp, d), dp.Value, unit, dp.Annotation); err != nil { s.metrics.writeTagged.ReportError(s.nowFn().Sub(callStart)) return convert.ToRPCError(err) @@ -1965,18 +1963,11 @@ func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedB continue } - dec, err := s.newPooledTagsDecoder(ctx, elem.EncodedTags, pooledReq) - if err != nil { - nonRetryableErrors++ - pooledReq.addError(tterrors.NewBadRequestWriteBatchRawError(i, err)) - continue - } - seriesID := s.newPooledID(ctx, elem.ID, pooledReq) + batchWriter.AddTagged( i, seriesID, - dec, elem.EncodedTags, xtime.FromNormalizedTime(elem.Datapoint.Timestamp, d), elem.Datapoint.Value, @@ -2086,18 +2077,11 @@ func (s *service) WriteTaggedBatchRawV2(tctx thrift.Context, req *rpc.WriteTagge continue } - dec, err := s.newPooledTagsDecoder(ctx, elem.EncodedTags, pooledReq) - if err != nil { - nonRetryableErrors++ - pooledReq.addError(tterrors.NewBadRequestWriteBatchRawError(i, err)) - continue - } - seriesID := s.newPooledID(ctx, elem.ID, pooledReq) + batchWriter.AddTagged( i, seriesID, - dec, elem.EncodedTags, xtime.FromNormalizedTime(elem.Datapoint.Timestamp, d), elem.Datapoint.Value, @@ -2629,31 +2613,6 @@ func readEncodedResultSegment( return converted.Segments, nil } -func (s *service) newTagsDecoder(ctx context.Context, encodedTags []byte) (serialize.TagDecoder, error) { - checkedBytes := s.pools.checkedBytesWrapper.Get(encodedTags) - dec := s.pools.tagDecoder.Get() - ctx.RegisterCloser(dec) - dec.Reset(checkedBytes) - if err := dec.Err(); err != nil { - return nil, err - } - return dec, nil -} - -func (s *service) newPooledTagsDecoder( - ctx context.Context, - encodedTags []byte, - p *writeBatchPooledReq, -) (serialize.TagDecoder, error) { - if decoder, ok := p.nextPooledTagDecoder(encodedTags); ok { - if err := decoder.Err(); err != nil { - return nil, err - } - return decoder, nil - } - return s.newTagsDecoder(ctx, encodedTags) -} - func (s *service) newCloseableMetadataV2Result( res *rpc.FetchBlocksMetadataRawV2Result_, ) closeableMetadataV2Result { @@ -2709,23 +2668,6 @@ func (r *writeBatchPooledReq) nextPooledID(idBytes []byte) (ident.ID, bool) { return id, true } -func (r *writeBatchPooledReq) nextPooledTagDecoder(encodedTags []byte) (serialize.TagDecoder, bool) { - if r.pooledIDsUsed >= len(r.pooledIDs) { - return nil, false - } - - bytes := r.pooledIDs[r.pooledIDsUsed].bytes - bytes.IncRef() - bytes.Reset(encodedTags) - - decoder := r.pooledIDs[r.pooledIDsUsed].tagDecoder - decoder.Reset(bytes) - - r.pooledIDsUsed++ - - return decoder, true -} - func (r *writeBatchPooledReq) Finalize() { // Reset the pooledIDsUsed and decrement the ref counts for i := 0; i < r.pooledIDsUsed; i++ { @@ -2829,9 +2771,8 @@ func (r *writeBatchPooledReq) numNonRetryableErrors() int { } type writeBatchPooledReqID struct { - bytes checked.Bytes - id ident.ID - tagDecoder serialize.TagDecoder + bytes checked.Bytes + id ident.ID } type writeBatchPooledReqPool struct { @@ -2849,14 +2790,10 @@ func newWriteBatchPooledReqPool( return &writeBatchPooledReqPool{pool: pool} } -func (p *writeBatchPooledReqPool) Init( - tagDecoderPool serialize.TagDecoderPool, -) { +func (p *writeBatchPooledReqPool) Init() { p.pool.Init(func() interface{} { - // NB(r): Make pooled IDs 2x the default write batch size to account for - // write tagged which also has encoded tags, plus an extra one for the - // namespace - pooledIDs := make([]writeBatchPooledReqID, 1+(2*client.DefaultWriteBatchSize)) + // NB(r): Make pooled IDs the default write batch size plus an extra one for the namespace + pooledIDs := make([]writeBatchPooledReqID, 1+client.DefaultWriteBatchSize) for i := range pooledIDs { pooledIDs[i].bytes = checked.NewBytes(nil, nil) pooledIDs[i].id = ident.BinaryID(pooledIDs[i].bytes) @@ -2865,8 +2802,6 @@ func (p *writeBatchPooledReqPool) Init( // immediately dec a ref here to avoid calling get on this ID // being a valid call pooledIDs[i].bytes.DecRef() - // Also ready a tag decoder - pooledIDs[i].tagDecoder = tagDecoderPool.Get() } return &writeBatchPooledReq{ pooledIDs: pooledIDs, diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index d7a1767415..95a7943263 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index" + conv "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/limits/permits" "github.com/m3db/m3/src/dbnode/storage/series" @@ -52,7 +53,6 @@ import ( "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" - "github.com/m3db/m3/src/x/serialize" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" @@ -1449,14 +1449,15 @@ func TestServiceFetchBlocksMetadataEndpointV2Raw(t *testing.T) { if len(expectedBlocks.tags.Values()) == 0 { require.Equal(t, 0, len(block.EncodedTags)) } else { - encodedTags := checked.NewBytes(block.EncodedTags, nil) - decoder := service.pools.tagDecoder.Get() - decoder.Reset(encodedTags) + id := ident.BinaryID(checked.NewBytes(block.ID, nil)) - expectedTags := ident.NewTagsIterator(expectedBlocks.tags) - require.True(t, ident.NewTagIterMatcher(expectedTags).Matches(decoder)) + actualTags, err := conv.FromSeriesIDAndEncodedTags(id.Bytes(), block.EncodedTags) + require.NoError(t, err) + + expectedTags, err := conv.FromSeriesIDAndTags(id, expectedBlocks.tags) + require.NoError(t, err) - decoder.Close() + require.True(t, expectedTags.Equal(actualTags)) } foundMatch := false @@ -2831,15 +2832,7 @@ func TestServiceWriteTaggedBatchRaw(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() - mockDecoder := serialize.NewMockTagDecoder(ctrl) - mockDecoder.EXPECT().Reset(gomock.Any()).AnyTimes() - mockDecoder.EXPECT().Err().Return(nil).AnyTimes() - mockDecoder.EXPECT().Close().AnyTimes() - mockDecoderPool := serialize.NewMockTagDecoderPool(ctrl) - mockDecoderPool.EXPECT().Get().Return(mockDecoder).AnyTimes() - - opts := tchannelthrift.NewOptions(). - SetTagDecoderPool(mockDecoderPool) + opts := tchannelthrift.NewOptions() service := NewService(mockDB, opts).(*service) @@ -2897,15 +2890,7 @@ func TestServiceWriteTaggedBatchRawV2(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() - mockDecoder := serialize.NewMockTagDecoder(ctrl) - mockDecoder.EXPECT().Reset(gomock.Any()).AnyTimes() - mockDecoder.EXPECT().Err().Return(nil).AnyTimes() - mockDecoder.EXPECT().Close().AnyTimes() - mockDecoderPool := serialize.NewMockTagDecoderPool(ctrl) - mockDecoderPool.EXPECT().Get().Return(mockDecoder).AnyTimes() - - opts := tchannelthrift.NewOptions(). - SetTagDecoderPool(mockDecoderPool) + opts := tchannelthrift.NewOptions() service := NewService(mockDB, opts).(*service) @@ -2964,15 +2949,7 @@ func TestServiceWriteTaggedBatchRawV2MultiNS(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() - mockDecoder := serialize.NewMockTagDecoder(ctrl) - mockDecoder.EXPECT().Reset(gomock.Any()).AnyTimes() - mockDecoder.EXPECT().Err().Return(nil).AnyTimes() - mockDecoder.EXPECT().Close().AnyTimes() - mockDecoderPool := serialize.NewMockTagDecoderPool(ctrl) - mockDecoderPool.EXPECT().Get().Return(mockDecoder).AnyTimes() - - opts := tchannelthrift.NewOptions(). - SetTagDecoderPool(mockDecoderPool) + opts := tchannelthrift.NewOptions() service := NewService(mockDB, opts).(*service) @@ -3079,15 +3056,7 @@ func TestServiceWriteTaggedBatchRawUnknownError(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() - mockDecoder := serialize.NewMockTagDecoder(ctrl) - mockDecoder.EXPECT().Reset(gomock.Any()).AnyTimes() - mockDecoder.EXPECT().Err().Return(nil).AnyTimes() - mockDecoder.EXPECT().Close().AnyTimes() - mockDecoderPool := serialize.NewMockTagDecoderPool(ctrl) - mockDecoderPool.EXPECT().Get().Return(mockDecoder).AnyTimes() - - opts := tchannelthrift.NewOptions(). - SetTagDecoderPool(mockDecoderPool) + opts := tchannelthrift.NewOptions() service := NewService(mockDB, opts).(*service) diff --git a/src/dbnode/network/server/tchannelthrift/options.go b/src/dbnode/network/server/tchannelthrift/options.go index 3f85c15131..4cda067ffe 100644 --- a/src/dbnode/network/server/tchannelthrift/options.go +++ b/src/dbnode/network/server/tchannelthrift/options.go @@ -71,11 +71,6 @@ func NewOptions() Options { poolOptions) tagEncoderPool.Init() - tagDecoderPool := serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), - poolOptions) - tagDecoderPool.Init() - bytesWrapperPool := xpool.NewCheckedBytesWrapperPool(poolOptions) bytesWrapperPool.Init() @@ -86,7 +81,6 @@ func NewOptions() Options { blockMetadataV2Pool: NewBlockMetadataV2Pool(nil), blockMetadataV2SlicePool: NewBlockMetadataV2SlicePool(nil, 0), tagEncoderPool: tagEncoderPool, - tagDecoderPool: tagDecoderPool, checkedBytesWrapperPool: bytesWrapperPool, queryLimits: limits.NoOpQueryLimits(), permitsOptions: permits.NewOptions(), @@ -163,16 +157,6 @@ func (o *options) TagEncoderPool() serialize.TagEncoderPool { return o.tagEncoderPool } -func (o *options) SetTagDecoderPool(value serialize.TagDecoderPool) Options { - opts := *o - opts.tagDecoderPool = value - return &opts -} - -func (o *options) TagDecoderPool() serialize.TagDecoderPool { - return o.tagDecoderPool -} - func (o *options) SetCheckedBytesWrapperPool(value xpool.CheckedBytesWrapperPool) Options { opts := *o opts.checkedBytesWrapperPool = value diff --git a/src/dbnode/network/server/tchannelthrift/types.go b/src/dbnode/network/server/tchannelthrift/types.go index e7802c44e5..dfefc560e8 100644 --- a/src/dbnode/network/server/tchannelthrift/types.go +++ b/src/dbnode/network/server/tchannelthrift/types.go @@ -75,12 +75,6 @@ type Options interface { // TagEncoderPool returns the tag encoder pool. TagEncoderPool() serialize.TagEncoderPool - // SetTagDecoderPool sets the tag encoder pool. - SetTagDecoderPool(value serialize.TagDecoderPool) Options - - // TagDecoderPool returns the tag encoder pool. - TagDecoderPool() serialize.TagDecoderPool - // SetCheckedBytesWrapperPool sets the checked bytes wrapper pool. SetCheckedBytesWrapperPool(value xpool.CheckedBytesWrapperPool) Options diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index a38fcd87c5..2d38f8135a 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -1077,9 +1077,9 @@ func TestCommitLogBatchWriteDoesNotAddErroredOrSkippedSeries(t *testing.T) { tt := alignedStart.Add(time.Minute * time.Duration(i)) tagsIter := opts.FilesystemOptions().TagDecoderPool().Get() tagsIter.Reset(checked.NewBytes(testSeriesWrites[i].EncodedTags, nil)) - writes.AddTagged(i, testSeriesWrites[i].ID, tagsIter, + require.NoError(t, writes.AddTagged(i, testSeriesWrites[i].ID, testSeriesWrites[i].EncodedTags, - tt, float64(i)*10.5, xtime.Second, nil) + tt, float64(i)*10.5, xtime.Second, nil)) } writes.SetSkipWrite(0) diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index e5c44f829c..217e0c75dc 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -742,7 +742,6 @@ func Run(runOpts RunOptions) { SetTopologyInitializer(syncCfg.TopologyInitializer). SetIdentifierPool(opts.IdentifierPool()). SetTagEncoderPool(tagEncoderPool). - SetTagDecoderPool(tagDecoderPool). SetCheckedBytesWrapperPool(opts.CheckedBytesWrapperPool()). SetMaxOutstandingWriteRequests(cfg.Limits.MaxOutstandingWriteRequests). SetMaxOutstandingReadRequests(cfg.Limits.MaxOutstandingReadRequests). diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 96070adf8a..68813d7895 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/block" dberrors "github.com/m3db/m3/src/dbnode/storage/errors" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" @@ -697,7 +698,7 @@ func (d *db) WriteTagged( ctx context.Context, namespace ident.ID, id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit xtime.Unit, @@ -709,7 +710,7 @@ func (d *db) WriteTagged( return err } - seriesWrite, err := n.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation) + seriesWrite, err := n.WriteTagged(ctx, id, tagResolver, timestamp, value, unit, annotation) if err != nil { return err } @@ -802,7 +803,7 @@ func (d *db) writeBatch( seriesWrite, err = n.WriteTagged( ctx, write.Write.Series.ID, - write.TagIter, + convert.NewEncodedTagsMetadataResolver(write.EncodedTags), write.Write.Datapoint.Timestamp, write.Write.Datapoint.Value, write.Write.Unit, diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index efe5f17b13..1b36596cf5 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/block" dberrors "github.com/m3db/m3/src/dbnode/storage/errors" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/tracepoint" @@ -870,13 +871,13 @@ func testDatabaseNamespaceIndexFunctions(t *testing.T, commitlogEnabled bool) { ns.EXPECT().WriteTagged(gomock.Any(), ident.NewIDMatcher("foo"), gomock.Any(), time.Time{}, 1.0, xtime.Second, nil).Return(seriesWrite, nil) require.NoError(t, d.WriteTagged(ctx, namespace, - id, tagsIter, time.Time{}, + id, convert.NewTagsIterMetadataResolver(tagsIter), time.Time{}, 1.0, xtime.Second, nil)) ns.EXPECT().WriteTagged(gomock.Any(), ident.NewIDMatcher("foo"), gomock.Any(), time.Time{}, 1.0, xtime.Second, nil).Return(SeriesWrite{}, fmt.Errorf("random err")) require.Error(t, d.WriteTagged(ctx, namespace, - ident.StringID("foo"), ident.EmptyTagIterator, time.Time{}, + ident.StringID("foo"), convert.EmptyTagMetadataResolver, time.Time{}, 1.0, xtime.Second, nil)) var ( @@ -1237,7 +1238,7 @@ func testDatabaseWriteBatch(t *testing.T, // in the WriteBatch slice. if tagged { batchWriter.AddTagged(i*2, ident.StringID(write.series), - tagsIter.Duplicate(), encodedTags.Bytes(), write.t, write.v, xtime.Second, nil) + encodedTags.Bytes(), write.t, write.v, xtime.Second, nil) wasWritten := write.err == nil ns.EXPECT(). WriteTagged(ctx, ident.NewIDMatcher(write.series), gomock.Any(), diff --git a/src/dbnode/storage/index/convert/convert.go b/src/dbnode/storage/index/convert/convert.go index 93465ff219..2b81550086 100644 --- a/src/dbnode/storage/index/convert/convert.go +++ b/src/dbnode/storage/index/convert/convert.go @@ -18,6 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// Package convert contains various conversions. package convert import ( diff --git a/src/dbnode/storage/index/convert/tag_resolver.go b/src/dbnode/storage/index/convert/tag_resolver.go new file mode 100644 index 0000000000..1e7c4ab11f --- /dev/null +++ b/src/dbnode/storage/index/convert/tag_resolver.go @@ -0,0 +1,99 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package convert + +import ( + "errors" + + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/x/ident" +) + +var ( + // EmptyTagMetadataResolver is empty tags iter metadata resolver. + EmptyTagMetadataResolver = NewTagsIterMetadataResolver(ident.EmptyTagIterator) + + // ErrUnknownTagMetadataResolverType is unknown tag metadata resolver type error. + ErrUnknownTagMetadataResolverType = errors.New("unknown tag metadata resolver type") +) + +type tagResolverType uint8 + +const ( + tagResolverEncodedTags tagResolverType = iota + tagResolverIter + tagResolverTags +) + +// TagMetadataResolver represents metadata resolver. +type TagMetadataResolver struct { + resolverType tagResolverType + encodedTags ts.EncodedTags + tagsIter ident.TagIterator + tags ident.Tags +} + +// NewEncodedTagsMetadataResolver returns metadata resolver which accepts encoded tags. +func NewEncodedTagsMetadataResolver(encodedTags ts.EncodedTags) TagMetadataResolver { + return TagMetadataResolver{ + resolverType: tagResolverEncodedTags, + encodedTags: encodedTags, + } +} + +// NewTagsIterMetadataResolver returns metadata resolver which accepts tags iterator. +func NewTagsIterMetadataResolver(tagsIter ident.TagIterator) TagMetadataResolver { + return TagMetadataResolver{ + resolverType: tagResolverIter, + tagsIter: tagsIter, + } +} + +// NewTagsMetadataResolver returns metadata resolver which accepts tags. +func NewTagsMetadataResolver(tags ident.Tags) TagMetadataResolver { + return TagMetadataResolver{ + resolverType: tagResolverTags, + tags: tags, + } +} + +// Resolve resolves doc.Metadata from seriesID. +func (t TagMetadataResolver) Resolve(id ident.ID) (doc.Metadata, error) { + switch t.resolverType { + case tagResolverEncodedTags: + return FromSeriesIDAndEncodedTags(id.Bytes(), t.encodedTags) + case tagResolverIter: + // NB(r): Rewind so we record the tag iterator from the beginning. + tagsIter := t.tagsIter.Duplicate() + + seriesMetadata, err := FromSeriesIDAndTagIter(id, tagsIter) + tagsIter.Close() + if err != nil { + return doc.Metadata{}, err + } + return seriesMetadata, nil + case tagResolverTags: + return FromSeriesIDAndTags(id, t.tags) + default: + return doc.Metadata{}, ErrUnknownTagMetadataResolverType + } +} diff --git a/src/dbnode/storage/index/convert/tag_resolver_test.go b/src/dbnode/storage/index/convert/tag_resolver_test.go new file mode 100644 index 0000000000..8d66b06f94 --- /dev/null +++ b/src/dbnode/storage/index/convert/tag_resolver_test.go @@ -0,0 +1,75 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package convert + +import ( + "encoding/base64" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/x/ident" +) + +//nolint:lll +const encodedTagSample = "dScMAAgAX19uYW1lX18GAGRpc2tpbwQAYXJjaAMAeDY0CgBkYXRhY2VudGVyCgB1cy13ZXN0LTJjCABob3N0bmFtZQcAaG9zdF83OAsAbWVhc3VyZW1lbnQFAHJlYWRzAgBvcwsAVWJ1bnR1MTUuMTAEAHJhY2sCADg3BgByZWdpb24JAHVzLXdlc3QtMgcAc2VydmljZQIAMTETAHNlcnZpY2VfZW52aXJvbm1lbnQKAHByb2R1Y3Rpb24PAHNlcnZpY2VfdmVyc2lvbgEAMQQAdGVhbQIAU0Y=" + +func TestEncodedTagsMetadataResolver(t *testing.T) { + encodedTags, err := base64.StdEncoding.DecodeString(encodedTagSample) + require.NoError(t, err) + + sut := NewEncodedTagsMetadataResolver(encodedTags) + metadata, err := sut.Resolve(ident.StringID("testId")) + require.NoError(t, err) + require.Equal(t, "testId", string(metadata.ID)) + assertFieldValue(t, metadata, "__name__", "diskio") +} + +func TestIterMetadataResolver(t *testing.T) { + sut := NewTagsIterMetadataResolver(ident.NewTagsIterator(ident.NewTags( + ident.StringTag("name", "foo"), + ident.StringTag("host", "localhost")))) + + metadata, err := sut.Resolve(ident.StringID("testId")) + require.NoError(t, err) + require.Equal(t, "testId", string(metadata.ID)) + assertFieldValue(t, metadata, "name", "foo") + assertFieldValue(t, metadata, "host", "localhost") +} + +func TestTagsMetadataResolver(t *testing.T) { + sut := NewTagsMetadataResolver(ident.NewTags( + ident.StringTag("__name__", "foo"), + ident.StringTag("name", "bar"))) + + metadata, err := sut.Resolve(ident.StringID("testId")) + require.NoError(t, err) + require.Equal(t, "testId", string(metadata.ID)) + assertFieldValue(t, metadata, "name", "bar") + assertFieldValue(t, metadata, "__name__", "foo") +} + +func assertFieldValue(t *testing.T, metadata doc.Metadata, expectedFieldName, expectedValue string) { + val, ok := metadata.Get([]byte(expectedFieldName)) + require.True(t, ok) + require.Equal(t, expectedValue, string(val)) +} diff --git a/src/dbnode/storage/index_queue_forward_write_test.go b/src/dbnode/storage/index_queue_forward_write_test.go index 1e1d1ca4e7..2cfdc53e27 100644 --- a/src/dbnode/storage/index_queue_forward_write_test.go +++ b/src/dbnode/storage/index_queue_forward_write_test.go @@ -477,7 +477,7 @@ func writeToShard( tag := ident.Tag{Name: ident.StringID(id), Value: ident.StringID("")} idTags := ident.NewTags(tag) iter := ident.NewTagsIterator(idTags) - seriesWrite, err := shard.WriteTagged(ctx, ident.StringID(id), iter, now, + seriesWrite, err := shard.WriteTagged(ctx, ident.StringID(id), idxconvert.NewTagsIterMetadataResolver(iter), now, 1.0, xtime.Second, nil, series.WriteOptions{ TruncateType: series.TypeBlock, TransformOptions: series.WriteTransformOptions{ diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index ac05a4acb0..803f511d97 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" @@ -712,7 +713,7 @@ func (n *dbNamespace) Write( func (n *dbNamespace) WriteTagged( ctx context.Context, id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit xtime.Unit, @@ -736,7 +737,7 @@ func (n *dbNamespace) WriteTagged( TruncateType: n.opts.TruncateType(), SchemaDesc: nsCtx.Schema, } - seriesWrite, err := shard.WriteTagged(ctx, id, tags, timestamp, + seriesWrite, err := shard.WriteTagged(ctx, id, tagResolver, timestamp, value, unit, annotation, opts) n.metrics.writeTagged.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return seriesWrite, err diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 88d749731f..9feca7710b 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" @@ -217,7 +218,7 @@ func TestNamespaceReadOnlyRejectWrites(t *testing.T) { require.EqualError(t, err, errNamespaceReadOnly.Error()) require.False(t, seriesWrite.WasWritten) - seriesWrite, err = ns.WriteTagged(ctx, id, ident.EmptyTagIterator, now, 0, xtime.Second, nil) + seriesWrite, err = ns.WriteTagged(ctx, id, convert.EmptyTagMetadataResolver, now, 0, xtime.Second, nil) require.EqualError(t, err, errNamespaceReadOnly.Error()) require.False(t, seriesWrite.WasWritten) } @@ -1335,23 +1336,23 @@ func TestNamespaceIndexInsert(t *testing.T) { TruncateType: truncateType, } shard.EXPECT(). - WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, + WriteTagged(ctx, ident.NewIDMatcher("a"), convert.EmptyTagMetadataResolver, now, 1.0, xtime.Second, nil, opts). Return(SeriesWrite{WasWritten: true}, nil) shard.EXPECT(). - WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, + WriteTagged(ctx, ident.NewIDMatcher("a"), convert.EmptyTagMetadataResolver, now, 1.0, xtime.Second, nil, opts). Return(SeriesWrite{WasWritten: false}, nil) ns.shards[testShardIDs[0].ID()] = shard seriesWrite, err := ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + convert.EmptyTagMetadataResolver, now, 1.0, xtime.Second, nil) require.NoError(t, err) require.True(t, seriesWrite.WasWritten) seriesWrite, err = ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + convert.EmptyTagMetadataResolver, now, 1.0, xtime.Second, nil) require.NoError(t, err) require.False(t, seriesWrite.WasWritten) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 336f867d8c..30198bd934 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -70,16 +70,15 @@ const ( ) var ( - errShardEntryNotFound = errors.New("shard entry not found") - errShardNotOpen = errors.New("shard is not open") - errShardAlreadyTicking = errors.New("shard is already ticking") - errShardClosingTickTerminated = errors.New("shard is closing, terminating tick") - errShardInvalidPageToken = errors.New("shard could not unmarshal page token") - errNewShardEntryTagsTypeInvalid = errors.New("new shard entry options error: tags type invalid") - errShardIsNotBootstrapped = errors.New("shard is not bootstrapped") - errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped") - errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized") - errTriedToLoadNilSeries = errors.New("tried to load nil series into shard") + errShardEntryNotFound = errors.New("shard entry not found") + errShardNotOpen = errors.New("shard is not open") + errShardAlreadyTicking = errors.New("shard is already ticking") + errShardClosingTickTerminated = errors.New("shard is closing, terminating tick") + errShardInvalidPageToken = errors.New("shard could not unmarshal page token") + errShardIsNotBootstrapped = errors.New("shard is not bootstrapped") + errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped") + errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized") + errTriedToLoadNilSeries = errors.New("tried to load nil series into shard") // ErrDatabaseLoadLimitHit is the error returned when the database load limit // is hit or exceeded. @@ -115,42 +114,6 @@ const ( dbShardStateClosing ) -type tagsArgType uint - -const ( - // nolint: varcheck, unused - tagsInvalidArg tagsArgType = iota - tagsIterArg - tagsArg -) - -// tagsArgOptions is a union type that allows -// callers to pass either an ident.TagIterator or -// ident.Tags based on what access they have to -type tagsArgOptions struct { - arg tagsArgType - tagsIter ident.TagIterator - tags ident.Tags -} - -func newTagsIterArg( - tagsIter ident.TagIterator, -) tagsArgOptions { - return tagsArgOptions{ - arg: tagsIterArg, - tagsIter: tagsIter, - } -} - -func newTagsArg( - tags ident.Tags, -) tagsArgOptions { - return tagsArgOptions{ - arg: tagsArg, - tags: tags, - } -} - type dbShard struct { sync.RWMutex block.DatabaseBlockRetriever @@ -499,7 +462,7 @@ func (s *dbShard) OnRetrieveBlock( return } - entry, err = s.newShardEntry(id, newTagsIterArg(tags)) + entry, err = s.newShardEntry(id, convert.NewTagsIterMetadataResolver(tags)) if err != nil { // should never happen instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), @@ -883,14 +846,14 @@ func (s *dbShard) purgeExpiredSeries(expiredEntries []*lookup.Entry) { func (s *dbShard) WriteTagged( ctx context.Context, id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit xtime.Unit, annotation []byte, wOpts series.WriteOptions, ) (SeriesWrite, error) { - return s.writeAndIndex(ctx, id, tags, timestamp, + return s.writeAndIndex(ctx, id, tagResolver, timestamp, value, unit, annotation, wOpts, true) } @@ -903,14 +866,14 @@ func (s *dbShard) Write( annotation []byte, wOpts series.WriteOptions, ) (SeriesWrite, error) { - return s.writeAndIndex(ctx, id, ident.EmptyTagIterator, timestamp, + return s.writeAndIndex(ctx, id, convert.EmptyTagMetadataResolver, timestamp, value, unit, annotation, wOpts, false) } func (s *dbShard) writeAndIndex( ctx context.Context, id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit xtime.Unit, @@ -929,7 +892,7 @@ func (s *dbShard) writeAndIndex( // If no entry and we are not writing new series asynchronously. if !writable && !opts.writeNewSeriesAsync { // Avoid double lookup by enqueueing insert immediately. - result, err := s.insertSeriesAsyncBatched(id, tags, dbShardInsertAsyncOptions{ + result, err := s.insertSeriesAsyncBatched(id, tagResolver, dbShardInsertAsyncOptions{ hasPendingIndexing: shouldReverseIndex, pendingIndex: dbShardPendingIndex{ timestamp: timestamp, @@ -944,7 +907,7 @@ func (s *dbShard) writeAndIndex( result.wg.Wait() // Retrieve the inserted entry - entry, err = s.writableSeries(id, tags) + entry, err = s.writableSeries(id, tagResolver) if err != nil { return SeriesWrite{}, err } @@ -1003,7 +966,7 @@ func (s *dbShard) writeAndIndex( annotationClone.AppendAll(annotation) } - result, err := s.insertSeriesAsyncBatched(id, tags, dbShardInsertAsyncOptions{ + result, err := s.insertSeriesAsyncBatched(id, tagResolver, dbShardInsertAsyncOptions{ hasPendingWrite: true, pendingWrite: dbShardPendingWrite{ timestamp: timestamp, @@ -1063,7 +1026,7 @@ func (s *dbShard) SeriesRefResolver( return entry, nil } - entry, err = s.newShardEntry(id, newTagsIterArg(tags)) + entry, err = s.newShardEntry(id, convert.NewTagsIterMetadataResolver(tags)) if err != nil { return nil, err } @@ -1157,7 +1120,7 @@ func (s *dbShard) lookupEntryWithLock(id ident.ID) (*lookup.Entry, *list.Element return elem.Value.(*lookup.Entry), elem, nil } -func (s *dbShard) writableSeries(id ident.ID, tags ident.TagIterator) (*lookup.Entry, error) { +func (s *dbShard) writableSeries(id ident.ID, tagResolver convert.TagMetadataResolver) (*lookup.Entry, error) { for { entry, err := s.retrieveWritableSeries(id) if entry != nil { @@ -1168,7 +1131,7 @@ func (s *dbShard) writableSeries(id ident.ID, tags ident.TagIterator) (*lookup.E } // Not inserted, attempt a batched insert - result, err := s.insertSeriesAsyncBatched(id, tags, dbShardInsertAsyncOptions{}) + result, err := s.insertSeriesAsyncBatched(id, tagResolver, dbShardInsertAsyncOptions{}) if err != nil { return nil, err } @@ -1210,7 +1173,7 @@ func (s *dbShard) retrieveWritableSeries(id ident.ID) (*lookup.Entry, error) { func (s *dbShard) newShardEntry( id ident.ID, - tagsArgOpts tagsArgOptions, + tagResolver convert.TagMetadataResolver, ) (*lookup.Entry, error) { // NB(r): As documented in storage/series.DatabaseSeries the series IDs // and metadata are garbage collected, hence we cast the ID to a BytesID @@ -1227,28 +1190,10 @@ func (s *dbShard) newShardEntry( seriesMetadata doc.Metadata err error ) - switch tagsArgOpts.arg { - case tagsIterArg: - // NB(r): Rewind so we record the tag iterator from the beginning. - tagsIter := tagsArgOpts.tagsIter.Duplicate() - - // Pass nil for the identifier pool because the pool will force us to use an array - // with a large capacity to store the tags. Since these tags are long-lived, it's - // better to allocate an array of the exact size to save memory. - seriesMetadata, err = convert.FromSeriesIDAndTagIter(id, tagsIter) - tagsIter.Close() - if err != nil { - return nil, err - } - - case tagsArg: - seriesMetadata, err = convert.FromSeriesIDAndTags(id, tagsArgOpts.tags) - if err != nil { - return nil, err - } - default: - return nil, errNewShardEntryTagsTypeInvalid + seriesMetadata, err = tagResolver.Resolve(id) + if err != nil { + return nil, err } // Use the same bytes as the series metadata for the ID. @@ -1345,10 +1290,10 @@ func (s *dbShard) insertSeriesForIndexingAsyncBatched( func (s *dbShard) insertSeriesAsyncBatched( id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, opts dbShardInsertAsyncOptions, ) (insertAsyncResult, error) { - entry, err := s.newShardEntry(id, newTagsIterArg(tags)) + entry, err := s.newShardEntry(id, tagResolver) if err != nil { return insertAsyncResult{}, err } @@ -1381,12 +1326,12 @@ type insertSyncOptions struct { func (s *dbShard) insertSeriesSync( id ident.ID, - tagsArgOpts tagsArgOptions, + tagResolver convert.TagMetadataResolver, opts insertSyncOptions, ) (*lookup.Entry, error) { // NB(r): Create new shard entry outside of write lock to reduce // time using write lock. - newEntry, err := s.newShardEntry(id, tagsArgOpts) + newEntry, err := s.newShardEntry(id, tagResolver) if err != nil { // should never happen instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), @@ -2137,7 +2082,7 @@ func (s *dbShard) loadBlock( if entry == nil { // Synchronously insert to avoid waiting for the insert queue which could potentially // delay the insert. - entry, err = s.insertSeriesSync(id, newTagsArg(tags), + entry, err = s.insertSeriesSync(id, convert.NewTagsMetadataResolver(tags), insertSyncOptions{ // NB(r): Because insertSyncIncReaderWriterCount is used here we // don't need to explicitly increment the reader/writer count and it diff --git a/src/dbnode/storage/shard_index_test.go b/src/dbnode/storage/shard_index_test.go index 761919aa1f..4dedab856a 100644 --- a/src/dbnode/storage/shard_index_test.go +++ b/src/dbnode/storage/shard_index_test.go @@ -28,6 +28,8 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" + "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -36,7 +38,6 @@ import ( "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/storage/series" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -78,13 +79,13 @@ func TestShardInsertNamespaceIndex(t *testing.T) { defer ctx.Close() seriesWrite, err := shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + convert.NewTagsIterMetadataResolver(ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value")))), now, 1.0, xtime.Second, nil, series.WriteOptions{}) require.NoError(t, err) require.True(t, seriesWrite.WasWritten) seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + convert.NewTagsIterMetadataResolver(ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value")))), now, 2.0, xtime.Second, nil, series.WriteOptions{}) require.NoError(t, err) require.True(t, seriesWrite.WasWritten) @@ -122,7 +123,7 @@ func TestShardAsyncInsertMarkIndexedForBlockStart(t *testing.T) { // write first time seriesWrite, err := shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + convert.NewTagsIterMetadataResolver(ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value")))), now, 1.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) @@ -170,7 +171,7 @@ func TestShardAsyncIndexIfExpired(t *testing.T) { defer ctx.Close() seriesWrite, err := shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + convert.NewTagsIterMetadataResolver(ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value")))), now, 1.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) @@ -197,7 +198,7 @@ func TestShardAsyncIndexIfExpired(t *testing.T) { // ensure we would need to index next block because it's expired nextWriteTime := now.Add(blockSize) seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + convert.NewTagsIterMetadataResolver(ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value")))), nextWriteTime, 2.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) diff --git a/src/dbnode/storage/shard_ref_count_test.go b/src/dbnode/storage/shard_ref_count_test.go index 7e046832d4..4afdc52d61 100644 --- a/src/dbnode/storage/shard_ref_count_test.go +++ b/src/dbnode/storage/shard_ref_count_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" "github.com/m3db/m3/src/x/clock" @@ -189,15 +190,18 @@ func testShardWriteTaggedSyncRefCount(t *testing.T, idx NamespaceIndex) { ctx := context.NewBackground() defer ctx.Close() - seriesWrite, err := shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, now, 1.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err := shard.WriteTagged(ctx, ident.StringID("foo"), + convert.EmptyTagMetadataResolver, now, 1.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, now, 2.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("bar"), + convert.EmptyTagMetadataResolver, now, 2.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, now, 3.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("baz"), + convert.EmptyTagMetadataResolver, now, 3.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) @@ -213,15 +217,18 @@ func testShardWriteTaggedSyncRefCount(t *testing.T, idx NamespaceIndex) { // write already inserted series' next := now.Add(time.Minute) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, next, 1.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("foo"), + convert.EmptyTagMetadataResolver, next, 1.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, next, 2.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("bar"), + convert.EmptyTagMetadataResolver, next, 2.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, next, 3.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("baz"), + convert.EmptyTagMetadataResolver, next, 3.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) @@ -412,7 +419,7 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx NamespaceIndex, nowFn f defer ctx.Close() seriesWrite, err := shard.WriteTagged(ctx, ident.StringID("foo"), - ident.EmptyTagIterator, now, 1.0, xtime.Second, nil, series.WriteOptions{}) + convert.EmptyTagMetadataResolver, now, 1.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) assert.True(t, seriesWrite.NeedsIndex) @@ -420,7 +427,7 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx NamespaceIndex, nowFn f seriesWrite.PendingIndexInsert.Entry.OnIndexSeries.OnIndexFinalize(idx.BlockStartForWriteTime(now)) seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("bar"), - ident.EmptyTagIterator, now, 2.0, xtime.Second, nil, series.WriteOptions{}) + convert.EmptyTagMetadataResolver, now, 2.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) assert.True(t, seriesWrite.NeedsIndex) @@ -428,7 +435,7 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx NamespaceIndex, nowFn f seriesWrite.PendingIndexInsert.Entry.OnIndexSeries.OnIndexFinalize(idx.BlockStartForWriteTime(now)) seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("baz"), - ident.EmptyTagIterator, now, 3.0, xtime.Second, nil, series.WriteOptions{}) + convert.EmptyTagMetadataResolver, now, 3.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) assert.True(t, seriesWrite.NeedsIndex) @@ -453,15 +460,18 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx NamespaceIndex, nowFn f // write already inserted series' next := now.Add(time.Minute) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, next, 1.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("foo"), + convert.EmptyTagMetadataResolver, next, 1.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, next, 2.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("bar"), + convert.EmptyTagMetadataResolver, next, 2.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) - seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, next, 3.0, xtime.Second, nil, series.WriteOptions{}) + seriesWrite, err = shard.WriteTagged(ctx, ident.StringID("baz"), + convert.EmptyTagMetadataResolver, next, 3.0, xtime.Second, nil, series.WriteOptions{}) assert.NoError(t, err) assert.True(t, seriesWrite.WasWritten) diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index c8d89d4ed2..d4bcc45e18 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/dbnode/ts" @@ -1436,7 +1437,7 @@ func TestPurgeExpiredSeriesWriteAfterPurging(t *testing.T) { s.EXPECT().Tick(gomock.Any(), gomock.Any()).Do(func(interface{}, interface{}) { // Emulate a write taking place and staying open just after tick for this series var err error - entry, err = shard.writableSeries(id, ident.EmptyTagIterator) + entry, err = shard.writableSeries(id, convert.EmptyTagMetadataResolver) require.NoError(t, err) }).Return(series.TickResult{}, series.ErrSeriesAllDatapointsExpired) @@ -1774,7 +1775,7 @@ func TestShardNewInvalidShardEntry(t *testing.T) { iter.EXPECT().Close(), ) - _, err := shard.newShardEntry(ident.StringID("abc"), newTagsIterArg(iter)) + _, err := shard.newShardEntry(ident.StringID("abc"), convert.NewTagsIterMetadataResolver(iter)) require.Error(t, err) } @@ -1785,7 +1786,7 @@ func TestShardNewValidShardEntry(t *testing.T) { shard := testDatabaseShard(t, DefaultTestOptions()) defer shard.Close() - _, err := shard.newShardEntry(ident.StringID("abc"), newTagsIterArg(ident.EmptyTagIterator)) + _, err := shard.newShardEntry(ident.StringID("abc"), convert.NewTagsIterMetadataResolver(ident.EmptyTagIterator)) require.NoError(t, err) } @@ -1819,7 +1820,7 @@ func TestShardNewEntryDoesNotAlterIDOrTags(t *testing.T) { Times(1). Return(ident.NewTagsIterator(seriesTags)) - entry, err := shard.newShardEntry(id, newTagsIterArg(iter)) + entry, err := shard.newShardEntry(id, convert.NewTagsIterMetadataResolver(iter)) require.NoError(t, err) shard.Lock() diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index c2a162544a..bf437bfbf1 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -42,6 +42,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/limits/permits" "github.com/m3db/m3/src/dbnode/storage/repair" @@ -245,17 +246,17 @@ func (mr *MockDatabaseMockRecorder) Write(ctx, namespace, id, timestamp, value, } // WriteTagged mocks base method -func (m *MockDatabase) WriteTagged(ctx context.Context, namespace, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *MockDatabase) WriteTagged(ctx context.Context, namespace, id ident.ID, tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteTagged", ctx, namespace, id, tags, timestamp, value, unit, annotation) + ret := m.ctrl.Call(m, "WriteTagged", ctx, namespace, id, tagResolver, timestamp, value, unit, annotation) ret0, _ := ret[0].(error) return ret0 } // WriteTagged indicates an expected call of WriteTagged -func (mr *MockDatabaseMockRecorder) WriteTagged(ctx, namespace, id, tags, timestamp, value, unit, annotation interface{}) *gomock.Call { +func (mr *MockDatabaseMockRecorder) WriteTagged(ctx, namespace, id, tagResolver, timestamp, value, unit, annotation interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*MockDatabase)(nil).WriteTagged), ctx, namespace, id, tags, timestamp, value, unit, annotation) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*MockDatabase)(nil).WriteTagged), ctx, namespace, id, tagResolver, timestamp, value, unit, annotation) } // BatchWriter mocks base method @@ -684,17 +685,17 @@ func (mr *MockdatabaseMockRecorder) Write(ctx, namespace, id, timestamp, value, } // WriteTagged mocks base method -func (m *Mockdatabase) WriteTagged(ctx context.Context, namespace, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *Mockdatabase) WriteTagged(ctx context.Context, namespace, id ident.ID, tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteTagged", ctx, namespace, id, tags, timestamp, value, unit, annotation) + ret := m.ctrl.Call(m, "WriteTagged", ctx, namespace, id, tagResolver, timestamp, value, unit, annotation) ret0, _ := ret[0].(error) return ret0 } // WriteTagged indicates an expected call of WriteTagged -func (mr *MockdatabaseMockRecorder) WriteTagged(ctx, namespace, id, tags, timestamp, value, unit, annotation interface{}) *gomock.Call { +func (mr *MockdatabaseMockRecorder) WriteTagged(ctx, namespace, id, tagResolver, timestamp, value, unit, annotation interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*Mockdatabase)(nil).WriteTagged), ctx, namespace, id, tags, timestamp, value, unit, annotation) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*Mockdatabase)(nil).WriteTagged), ctx, namespace, id, tagResolver, timestamp, value, unit, annotation) } // BatchWriter mocks base method @@ -1547,18 +1548,18 @@ func (mr *MockdatabaseNamespaceMockRecorder) Write(ctx, id, timestamp, value, un } // WriteTagged mocks base method -func (m *MockdatabaseNamespace) WriteTagged(ctx context.Context, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) (SeriesWrite, error) { +func (m *MockdatabaseNamespace) WriteTagged(ctx context.Context, id ident.ID, tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) (SeriesWrite, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteTagged", ctx, id, tags, timestamp, value, unit, annotation) + ret := m.ctrl.Call(m, "WriteTagged", ctx, id, tagResolver, timestamp, value, unit, annotation) ret0, _ := ret[0].(SeriesWrite) ret1, _ := ret[1].(error) return ret0, ret1 } // WriteTagged indicates an expected call of WriteTagged -func (mr *MockdatabaseNamespaceMockRecorder) WriteTagged(ctx, id, tags, timestamp, value, unit, annotation interface{}) *gomock.Call { +func (mr *MockdatabaseNamespaceMockRecorder) WriteTagged(ctx, id, tagResolver, timestamp, value, unit, annotation interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*MockdatabaseNamespace)(nil).WriteTagged), ctx, id, tags, timestamp, value, unit, annotation) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*MockdatabaseNamespace)(nil).WriteTagged), ctx, id, tagResolver, timestamp, value, unit, annotation) } // QueryIDs mocks base method @@ -2099,18 +2100,18 @@ func (mr *MockdatabaseShardMockRecorder) Write(ctx, id, timestamp, value, unit, } // WriteTagged mocks base method -func (m *MockdatabaseShard) WriteTagged(ctx context.Context, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte, wOpts series.WriteOptions) (SeriesWrite, error) { +func (m *MockdatabaseShard) WriteTagged(ctx context.Context, id ident.ID, tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit time0.Unit, annotation []byte, wOpts series.WriteOptions) (SeriesWrite, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteTagged", ctx, id, tags, timestamp, value, unit, annotation, wOpts) + ret := m.ctrl.Call(m, "WriteTagged", ctx, id, tagResolver, timestamp, value, unit, annotation, wOpts) ret0, _ := ret[0].(SeriesWrite) ret1, _ := ret[1].(error) return ret0, ret1 } // WriteTagged indicates an expected call of WriteTagged -func (mr *MockdatabaseShardMockRecorder) WriteTagged(ctx, id, tags, timestamp, value, unit, annotation, wOpts interface{}) *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) WriteTagged(ctx, id, tagResolver, timestamp, value, unit, annotation, wOpts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*MockdatabaseShard)(nil).WriteTagged), ctx, id, tags, timestamp, value, unit, annotation, wOpts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*MockdatabaseShard)(nil).WriteTagged), ctx, id, tagResolver, timestamp, value, unit, annotation, wOpts) } // ReadEncoded mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 3dd0edf356..cdcd3f20d7 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/limits/permits" "github.com/m3db/m3/src/dbnode/storage/repair" @@ -115,7 +116,7 @@ type Database interface { ctx context.Context, namespace ident.ID, id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit xtime.Unit, @@ -368,7 +369,7 @@ type databaseNamespace interface { WriteTagged( ctx context.Context, id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit xtime.Unit, @@ -527,7 +528,7 @@ type databaseShard interface { WriteTagged( ctx context.Context, id ident.ID, - tags ident.TagIterator, + tagResolver convert.TagMetadataResolver, timestamp time.Time, value float64, unit xtime.Unit, diff --git a/src/dbnode/ts/writes/types.go b/src/dbnode/ts/writes/types.go index 121a483511..d61fa6efef 100644 --- a/src/dbnode/ts/writes/types.go +++ b/src/dbnode/ts/writes/types.go @@ -64,10 +64,6 @@ type BatchWrite struct { // object first, cannot use the Series provided by the caller as it // is missing important fields like Tags.) Write Write - // Not used by the commitlog, provided by the caller (since the request - // is usually coming from over the wire) and is superseded by the Tags - // in Write.Series which will get set by the Shard object. - TagIter ident.TagIterator // EncodedTags is used by the commit log, but also held onto as a reference // here so that it can be returned to the pool after the write to commit log // completes (since the Write.Series gets overwritten in SetOutcome so can't @@ -115,7 +111,6 @@ type BatchWriter interface { AddTagged( originalIndex int, id ident.ID, - tags ident.TagIterator, encodedTags ts.EncodedTags, timestamp time.Time, value float64, diff --git a/src/dbnode/ts/writes/write_batch.go b/src/dbnode/ts/writes/write_batch.go index 9a0a3b77b8..05d433c1b7 100644 --- a/src/dbnode/ts/writes/write_batch.go +++ b/src/dbnode/ts/writes/write_batch.go @@ -97,7 +97,7 @@ func (b *writeBatch) Add( annotation []byte, ) error { write, err := newBatchWriterWrite( - originalIndex, b.ns, id, nil, nil, timestamp, value, unit, annotation) + originalIndex, b.ns, id, nil, timestamp, value, unit, annotation) if err != nil { return err } @@ -108,7 +108,6 @@ func (b *writeBatch) Add( func (b *writeBatch) AddTagged( originalIndex int, id ident.ID, - tagIter ident.TagIterator, encodedTags ts.EncodedTags, timestamp time.Time, value float64, @@ -116,7 +115,7 @@ func (b *writeBatch) AddTagged( annotation []byte, ) error { write, err := newBatchWriterWrite( - originalIndex, b.ns, id, tagIter, encodedTags, timestamp, value, unit, annotation) + originalIndex, b.ns, id, encodedTags, timestamp, value, unit, annotation) if err != nil { return err } @@ -249,15 +248,14 @@ func newBatchWriterWrite( originalIndex int, namespace ident.ID, id ident.ID, - tagIter ident.TagIterator, encodedTags ts.EncodedTags, timestamp time.Time, value float64, unit xtime.Unit, annotation []byte, ) (BatchWrite, error) { - write := tagIter == nil && encodedTags == nil - writeTagged := tagIter != nil && encodedTags != nil + write := encodedTags == nil + writeTagged := encodedTags != nil if !write && !writeTagged { return BatchWrite{}, errTagsAndEncodedTagsRequired } @@ -276,7 +274,6 @@ func newBatchWriterWrite( Unit: unit, Annotation: annotation, }, - TagIter: tagIter, EncodedTags: encodedTags, OriginalIndex: originalIndex, }, nil diff --git a/src/dbnode/ts/writes/write_batch_mock.go b/src/dbnode/ts/writes/write_batch_mock.go index b4c6a15c1c..c1730c18b4 100644 --- a/src/dbnode/ts/writes/write_batch_mock.go +++ b/src/dbnode/ts/writes/write_batch_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/ts/writes/types.go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -73,17 +73,17 @@ func (mr *MockWriteBatchMockRecorder) Add(originalIndex, id, timestamp, value, u } // AddTagged mocks base method -func (m *MockWriteBatch) AddTagged(originalIndex int, id ident.ID, tags ident.TagIterator, encodedTags ts.EncodedTags, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *MockWriteBatch) AddTagged(originalIndex int, id ident.ID, encodedTags ts.EncodedTags, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddTagged", originalIndex, id, tags, encodedTags, timestamp, value, unit, annotation) + ret := m.ctrl.Call(m, "AddTagged", originalIndex, id, encodedTags, timestamp, value, unit, annotation) ret0, _ := ret[0].(error) return ret0 } // AddTagged indicates an expected call of AddTagged -func (mr *MockWriteBatchMockRecorder) AddTagged(originalIndex, id, tags, encodedTags, timestamp, value, unit, annotation interface{}) *gomock.Call { +func (mr *MockWriteBatchMockRecorder) AddTagged(originalIndex, id, encodedTags, timestamp, value, unit, annotation interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTagged", reflect.TypeOf((*MockWriteBatch)(nil).AddTagged), originalIndex, id, tags, encodedTags, timestamp, value, unit, annotation) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTagged", reflect.TypeOf((*MockWriteBatch)(nil).AddTagged), originalIndex, id, encodedTags, timestamp, value, unit, annotation) } // SetFinalizeEncodedTagsFn mocks base method @@ -262,17 +262,17 @@ func (mr *MockBatchWriterMockRecorder) Add(originalIndex, id, timestamp, value, } // AddTagged mocks base method -func (m *MockBatchWriter) AddTagged(originalIndex int, id ident.ID, tags ident.TagIterator, encodedTags ts.EncodedTags, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *MockBatchWriter) AddTagged(originalIndex int, id ident.ID, encodedTags ts.EncodedTags, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddTagged", originalIndex, id, tags, encodedTags, timestamp, value, unit, annotation) + ret := m.ctrl.Call(m, "AddTagged", originalIndex, id, encodedTags, timestamp, value, unit, annotation) ret0, _ := ret[0].(error) return ret0 } // AddTagged indicates an expected call of AddTagged -func (mr *MockBatchWriterMockRecorder) AddTagged(originalIndex, id, tags, encodedTags, timestamp, value, unit, annotation interface{}) *gomock.Call { +func (mr *MockBatchWriterMockRecorder) AddTagged(originalIndex, id, encodedTags, timestamp, value, unit, annotation interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTagged", reflect.TypeOf((*MockBatchWriter)(nil).AddTagged), originalIndex, id, tags, encodedTags, timestamp, value, unit, annotation) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTagged", reflect.TypeOf((*MockBatchWriter)(nil).AddTagged), originalIndex, id, encodedTags, timestamp, value, unit, annotation) } // SetFinalizeEncodedTagsFn mocks base method diff --git a/src/dbnode/ts/writes/write_batch_test.go b/src/dbnode/ts/writes/write_batch_test.go index 7f5e7976ee..ead971e33f 100644 --- a/src/dbnode/ts/writes/write_batch_test.go +++ b/src/dbnode/ts/writes/write_batch_test.go @@ -141,7 +141,6 @@ func TestBatchWriterAddTaggedAndIter(t *testing.T) { writeBatch.AddTagged( i, write.id, - write.tagIter, write.encodedTags(t).Bytes(), write.timestamp, write.value, @@ -160,7 +159,6 @@ func TestBatchWriterSetSeries(t *testing.T) { writeBatch.AddTagged( i, write.id, - write.tagIter, write.encodedTags(t).Bytes(), write.timestamp, write.value, @@ -294,7 +292,6 @@ func TestBatchWriterFinalizer(t *testing.T) { writeBatch.AddTagged( i, write.id, - write.tagIter, write.encodedTags(t).Bytes(), write.timestamp, write.value,