Skip to content

Commit

Permalink
[dbnode][m3ninx] Rename doc.Document -> doc.Metadata (#3062)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Jan 4, 2021
1 parent fe46c2c commit a5e31f3
Show file tree
Hide file tree
Showing 95 changed files with 702 additions and 680 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ genny-list-all: \
genny-list-storage-id:
cd $(m3x_package_path) && make genny-pooled-elem-list-gen \
pkg=storage \
value_type=doc.Document \
value_type=doc.Metadata \
rename_type_prefix=id \
rename_type_middle=ID \
target_package=github.com/m3db/m3/src/dbnode/storage
Expand Down
14 changes: 7 additions & 7 deletions src/dbnode/integration/fs_bootstrap_index_volume_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFilesystemBootstrapIndexVolumeTypes(t *testing.T) {
ID: ident.StringID("foo"),
Tags: ident.NewTags(ident.StringTag("city", "new_york"), ident.StringTag("foo", "foo")),
}
fooDoc := doc.Document{
fooDoc := doc.Metadata{
ID: fooSeries.ID.Bytes(),
Fields: []doc.Field{
doc.Field{Name: []byte("city"), Value: []byte("new_york")},
Expand All @@ -85,7 +85,7 @@ func TestFilesystemBootstrapIndexVolumeTypes(t *testing.T) {
ID: ident.StringID("bar"),
Tags: ident.NewTags(ident.StringTag("city", "new_jersey")),
}
barDoc := doc.Document{
barDoc := doc.Metadata{
ID: barSeries.ID.Bytes(),
Fields: []doc.Field{
doc.Field{Name: []byte("city"), Value: []byte("new_jersey")},
Expand All @@ -96,7 +96,7 @@ func TestFilesystemBootstrapIndexVolumeTypes(t *testing.T) {
ID: ident.StringID("baz"),
Tags: ident.NewTags(ident.StringTag("city", "seattle")),
}
bazDoc := doc.Document{
bazDoc := doc.Metadata{
ID: bazSeries.ID.Bytes(),
Fields: []doc.Field{
doc.Field{Name: []byte("city"), Value: []byte("seattle")},
Expand All @@ -107,7 +107,7 @@ func TestFilesystemBootstrapIndexVolumeTypes(t *testing.T) {
ID: ident.StringID("qux"),
Tags: ident.NewTags(ident.StringTag("city", "new_harmony")),
}
quxDoc := doc.Document{
quxDoc := doc.Metadata{
ID: quxSeries.ID.Bytes(),
Fields: []doc.Field{
doc.Field{Name: []byte("city"), Value: []byte("new_harmony")},
Expand All @@ -118,7 +118,7 @@ func TestFilesystemBootstrapIndexVolumeTypes(t *testing.T) {
ID: ident.StringID("dux"),
Tags: ident.NewTags(ident.StringTag("city", "los_angeles")),
}
duxDoc := doc.Document{
duxDoc := doc.Metadata{
ID: duxSeries.ID.Bytes(),
Fields: []doc.Field{
doc.Field{Name: []byte("city"), Value: []byte("los_angeles")},
Expand Down Expand Up @@ -163,12 +163,12 @@ func TestFilesystemBootstrapIndexVolumeTypes(t *testing.T) {
Start: now,
},
})
defaultIndexDocs := []doc.Document{
defaultIndexDocs := []doc.Metadata{
fooDoc,
barDoc,
bazDoc,
}
extraIndexDocs := []doc.Document{
extraIndexDocs := []doc.Metadata{
quxDoc,
duxDoc,
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func writeTestIndexDataToDisk(
indexVolumeType idxpersist.IndexVolumeType,
blockStart time.Time,
shards []uint32,
docs []doc.Document,
docs []doc.Metadata,
) error {
blockSize := md.Options().IndexOptions().BlockSize()
fsOpts := storageOpts.CommitLogOptions().FilesystemOptions()
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (m *merger) Merge(
ctx.Reset()
err = mergeWith.ForEachRemaining(
ctx, blockStart,
func(seriesMetadata doc.Document, mergeWithData block.FetchBlockResult) error {
func(seriesMetadata doc.Metadata, mergeWithData block.FetchBlockResult) error {
segmentReaders = segmentReaders[:0]
segmentReaders = appendBlockReadersToSegmentReaders(segmentReaders, mergeWithData.Blocks)

Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ func mockMergeWithFromData(
Start: startTime,
Blocks: []xio.BlockReader{blockReaderFromData(data, segReader, startTime, blockSize)},
}
fn(doc.Document{ID: id.Bytes()}, br)
err := fn(doc.Metadata{ID: id.Bytes()}, br)
require.NoError(t, err)
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ type BlockRetrieverOptions interface {

// ForEachRemainingFn is the function that is run on each of the remaining
// series of the merge target that did not intersect with the fileset.
type ForEachRemainingFn func(seriesMetadata doc.Document, data block.FetchBlockResult) error
type ForEachRemainingFn func(seriesMetadata doc.Metadata, data block.FetchBlockResult) error

// MergeWith is an interface that the fs merger uses to merge data with.
type MergeWith interface {
Expand Down
10 changes: 5 additions & 5 deletions src/dbnode/persist/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var errReusableTagIteratorRequired = errors.New("reusable tags iterator is requi
// Metadata is metadata for a time series, it can
// have several underlying sources.
type Metadata struct {
metadata doc.Document
metadata doc.Metadata
id ident.ID
tags ident.Tags
tagsIter ident.TagIterator
Expand All @@ -55,11 +55,11 @@ type MetadataOptions struct {
}

// NewMetadata returns a new metadata struct from series metadata.
// Note: because doc.Document has no pools for finalization we do not
// Note: because doc.Metadata has no pools for finalization we do not
// take MetadataOptions here, in future if we have pools or
// some other shared options that Metadata needs we will add it to this
// constructor as well.
func NewMetadata(metadata doc.Document) Metadata {
func NewMetadata(metadata doc.Metadata) Metadata {
return Metadata{metadata: metadata}
}

Expand Down Expand Up @@ -336,7 +336,7 @@ const (
type SeriesMetadataType uint8

const (
// SeriesDocumentType means the metadata is in doc.Document form.
// SeriesDocumentType means the metadata is in doc.Metadata form.
SeriesDocumentType SeriesMetadataType = iota
// SeriesIDAndEncodedTagsType means the metadata is in IDAndEncodedTags form.
SeriesIDAndEncodedTagsType
Expand All @@ -351,7 +351,7 @@ type IDAndEncodedTags struct {
// SeriesMetadata captures different representations of series metadata and
// the ownership status of the underlying memory.
type SeriesMetadata struct {
Document doc.Document
Document doc.Metadata
IDAndEncodedTags IDAndEncodedTags
Type SeriesMetadataType
LifeTime SeriesMetadataLifeTime
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,9 +721,9 @@ func (s *fileSystemSource) readNextEntryAndRecordBlock(

func (s *fileSystemSource) readNextEntryAndMaybeIndex(
r fs.DataFileSetReader,
batch []doc.Document,
batch []doc.Metadata,
builder *result.IndexBuilder,
) ([]doc.Document, error) {
) ([]doc.Metadata, error) {
// If performing index run, then simply read the metadata and add to segment.
id, tagsIter, _, _, err := r.ReadMetadata()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,9 +1012,9 @@ func (s *peersSource) processReaders(

func (s *peersSource) readNextEntryAndMaybeIndex(
r fs.DataFileSetReader,
batch []doc.Document,
batch []doc.Metadata,
builder *result.IndexBuilder,
) ([]doc.Document, error) {
) ([]doc.Metadata, error) {
// If performing index run, then simply read the metadata and add to segment.
id, tagsIter, _, _, err := r.ReadMetadata()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/result/result_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewIndexBuilder(builder segment.DocumentsBuilder) *IndexBuilder {
}

// FlushBatch flushes a batch of documents to the underlying segment builder.
func (b *IndexBuilder) FlushBatch(batch []doc.Document) ([]doc.Document, error) {
func (b *IndexBuilder) FlushBatch(batch []doc.Metadata) ([]doc.Metadata, error) {
if len(batch) == 0 {
// Last flush might not have any docs enqueued
return batch, nil
Expand Down Expand Up @@ -119,7 +119,7 @@ func (b *IndexBuilder) FlushBatch(batch []doc.Document) ([]doc.Document, error)
}

// Reset docs batch for reuse
var empty doc.Document
var empty doc.Metadata
for i := range batch {
batch[i] = empty
}
Expand Down
11 changes: 6 additions & 5 deletions src/dbnode/storage/fs_merge_with_mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestForEachRemaining(t *testing.T) {

mergeWith := newFSMergeWithMem(shard, retriever, dirtySeries, dirtySeriesToWrite)

var forEachCalls []doc.Document
var forEachCalls []doc.Metadata
shard.EXPECT().
FetchBlocksForColdFlush(gomock.Any(), ident.NewIDMatcher("id0"),
xtime.UnixNano(0).ToTime(), version+1, gomock.Any()).
Expand All @@ -181,10 +181,11 @@ func TestForEachRemaining(t *testing.T) {
FetchBlocksForColdFlush(gomock.Any(), ident.NewIDMatcher("id1"),
xtime.UnixNano(0).ToTime(), version+1, gomock.Any()).
Return(result, nil)
mergeWith.ForEachRemaining(ctx, 0, func(seriesMetadata doc.Document, result block.FetchBlockResult) error {
err := mergeWith.ForEachRemaining(ctx, 0, func(seriesMetadata doc.Metadata, result block.FetchBlockResult) error {
forEachCalls = append(forEachCalls, seriesMetadata)
return nil
}, nsCtx)
require.NoError(t, err)
require.Len(t, forEachCalls, 2)
assert.Equal(t, id0.Bytes(), forEachCalls[0].ID)
assert.Equal(t, id1.Bytes(), forEachCalls[1].ID)
Expand All @@ -209,7 +210,7 @@ func TestForEachRemaining(t *testing.T) {
FetchBlocksForColdFlush(gomock.Any(), ident.NewIDMatcher("id4"),
xtime.UnixNano(1).ToTime(), version+1, gomock.Any()).
Return(result, nil)
err = mergeWith.ForEachRemaining(ctx, 1, func(seriesMetadata doc.Document, result block.FetchBlockResult) error {
err = mergeWith.ForEachRemaining(ctx, 1, func(seriesMetadata doc.Metadata, result block.FetchBlockResult) error {
forEachCalls = append(forEachCalls, seriesMetadata)
return nil
}, nsCtx)
Expand All @@ -224,7 +225,7 @@ func TestForEachRemaining(t *testing.T) {
Return(result, nil)

// Test call with bad function execution.
err = mergeWith.ForEachRemaining(ctx, 4, func(seriesMetadata doc.Document, result block.FetchBlockResult) error {
err = mergeWith.ForEachRemaining(ctx, 4, func(seriesMetadata doc.Metadata, result block.FetchBlockResult) error {
return errors.New("bad")
}, nsCtx)
assert.Error(t, err)
Expand All @@ -241,7 +242,7 @@ func addDirtySeries(
seriesList = newIDList(nil)
dirtySeriesToWrite[start] = seriesList
}
element := seriesList.PushBack(doc.Document{ID: id.Bytes()})
element := seriesList.PushBack(doc.Metadata{ID: id.Bytes()})

dirtySeries.Set(idAndBlockStart{blockStart: start, id: id.Bytes()}, element)
}
14 changes: 7 additions & 7 deletions src/dbnode/storage/id_list_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (i *nsIndex) writeBatches(
// doc is valid. Add potential forward writes to the forwardWriteBatch.
batch.ForEach(
func(idx int, entry index.WriteBatchEntry,
d doc.Document, _ index.WriteBatchEntryResult) {
d doc.Metadata, _ index.WriteBatchEntryResult) {
total++

if len(i.doNotIndexWithFields) != 0 {
Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (r *aggregatedResults) Reset(
r.Unlock()
}

func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) {
func (r *aggregatedResults) AddDocuments(batch []doc.Metadata) (int, int, error) {
r.Lock()
err := r.addDocumentsBatchWithLock(batch)
size := r.resultsMap.Len()
Expand Down Expand Up @@ -162,7 +162,7 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
}

func (r *aggregatedResults) addDocumentsBatchWithLock(
batch []doc.Document,
batch []doc.Metadata,
) error {
for _, doc := range batch {
switch r.aggregateOpts.Type {
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *aggregatedResults) addDocumentsBatchWithLock(
}

func (r *aggregatedResults) addDocumentTermsWithLock(
document doc.Document,
document doc.Metadata,
) error {
for _, field := range document.Fields {
if err := r.addTermWithLock(field.Name); err != nil {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (r *aggregatedResults) addTermWithLock(
}

func (r *aggregatedResults) addDocumentWithLock(
document doc.Document,
document doc.Metadata,
) error {
for _, field := range document.Fields {
if err := r.addFieldWithLock(field.Name, field.Value); err != nil {
Expand Down
Loading

0 comments on commit a5e31f3

Please sign in to comment.