diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index de23a9efbf..217033516a 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -20,7 +20,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode) -//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" +//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter,DataEntryProcessor | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index cd40dd2d47..1e6675f0bb 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter) +// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter,DataEntryProcessor) // Copyright (c) 2020 Uber Technologies, Inc. // @@ -325,15 +325,12 @@ func (mr *MockDataFileSetReaderMockRecorder) StreamingEnabled() *gomock.Call { } // StreamingRead mocks base method -func (m *MockDataFileSetReader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32, error) { +func (m *MockDataFileSetReader) StreamingRead() (StreamedDataEntry, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StreamingRead") - ret0, _ := ret[0].(ident.BytesID) - ret1, _ := ret[1].(ts.EncodedTags) - ret2, _ := ret[2].([]byte) - ret3, _ := ret[3].(uint32) - ret4, _ := ret[4].(error) - return ret0, ret1, ret2, ret3, ret4 + ret0, _ := ret[0].(StreamedDataEntry) + ret1, _ := ret[1].(error) + return ret0, ret1 } // StreamingRead indicates an expected call of StreamingRead @@ -1405,3 +1402,52 @@ func (mr *MockStreamingWriterMockRecorder) WriteAll(arg0, arg1, arg2, arg3 inter mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteAll", reflect.TypeOf((*MockStreamingWriter)(nil).WriteAll), arg0, arg1, arg2, arg3) } + +// MockDataEntryProcessor is a mock of DataEntryProcessor interface +type MockDataEntryProcessor struct { + ctrl *gomock.Controller + recorder *MockDataEntryProcessorMockRecorder +} + +// MockDataEntryProcessorMockRecorder is the mock recorder for MockDataEntryProcessor +type MockDataEntryProcessorMockRecorder struct { + mock *MockDataEntryProcessor +} + +// NewMockDataEntryProcessor creates a new mock instance +func NewMockDataEntryProcessor(ctrl *gomock.Controller) *MockDataEntryProcessor { + mock := &MockDataEntryProcessor{ctrl: ctrl} + mock.recorder = &MockDataEntryProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDataEntryProcessor) EXPECT() *MockDataEntryProcessorMockRecorder { + return m.recorder +} + +// ProcessEntry mocks base method +func (m *MockDataEntryProcessor) ProcessEntry(arg0 StreamedDataEntry) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProcessEntry", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// ProcessEntry indicates an expected call of ProcessEntry +func (mr *MockDataEntryProcessorMockRecorder) ProcessEntry(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessEntry", reflect.TypeOf((*MockDataEntryProcessor)(nil).ProcessEntry), arg0) +} + +// SetEntriesCount mocks base method +func (m *MockDataEntryProcessor) SetEntriesCount(arg0 int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetEntriesCount", arg0) +} + +// SetEntriesCount indicates an expected call of SetEntriesCount +func (mr *MockDataEntryProcessorMockRecorder) SetEntriesCount(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetEntriesCount", reflect.TypeOf((*MockDataEntryProcessor)(nil).SetEntriesCount), arg0) +} diff --git a/src/dbnode/persist/fs/msgpack/decoder.go b/src/dbnode/persist/fs/msgpack/decoder.go index 532a3c66b3..a07d691c80 100644 --- a/src/dbnode/persist/fs/msgpack/decoder.go +++ b/src/dbnode/persist/fs/msgpack/decoder.go @@ -505,7 +505,7 @@ func (dec *Decoder) decodeWideEntry( var checksum int64 if compare == 0 { // NB: need to compute hash before freeing entry bytes. - checksum = dec.hasher.HashIndexEntry(entry) + checksum = dec.hasher.HashIndexEntry(entry.ID, entry.EncodedTags, entry.DataChecksum) return schema.WideEntry{ IndexEntry: entry, MetadataChecksum: checksum, @@ -522,7 +522,7 @@ func (dec *Decoder) decodeWideEntry( return emptyWideEntry, MismatchLookupStatus } - // compareID must have been before the curret entry.ID, so this + // compareID must have been before the current entry.ID, so this // ID will not be matched. return emptyWideEntry, NotFoundLookupStatus } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index d897832c31..3a02ee0811 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -33,7 +33,6 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs/msgpack" "github.com/m3db/m3/src/dbnode/persist/schema" - "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/checked" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" @@ -373,22 +372,22 @@ func (r *reader) readIndexAndSortByOffsetAsc() error { return nil } -func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32, error) { +func (r *reader) StreamingRead() (StreamedDataEntry, error) { if !r.streamingEnabled { - return nil, nil, nil, 0, errStreamingRequired + return StreamedDataEntry{}, errStreamingRequired } if r.entriesRead >= r.entries { - return nil, nil, nil, 0, io.EOF + return StreamedDataEntry{}, io.EOF } entry, err := r.decoder.DecodeIndexEntry(nil) if err != nil { - return nil, nil, nil, 0, err + return StreamedDataEntry{}, err } if entry.Offset+entry.Size > int64(len(r.dataMmap.Bytes)) { - return nil, nil, nil, 0, fmt.Errorf( + return StreamedDataEntry{}, fmt.Errorf( "attempt to read beyond data file size (offset=%d, size=%d, file size=%d)", entry.Offset, entry.Size, len(r.dataMmap.Bytes)) } @@ -397,7 +396,7 @@ func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32, // NB(r): _must_ check the checksum against known checksum as the data // file might not have been verified if we haven't read through the file yet. if entry.DataChecksum != int64(digest.Checksum(data)) { - return nil, nil, nil, 0, errSeekChecksumMismatch + return StreamedDataEntry{}, errSeekChecksumMismatch } r.streamingData = append(r.streamingData[:0], data...) @@ -406,7 +405,14 @@ func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32, r.entriesRead++ - return r.streamingID, r.streamingTags, r.streamingData, uint32(entry.DataChecksum), nil + dataEntry := StreamedDataEntry{ + ID: r.streamingID, + EncodedTags: r.streamingTags, + Data: r.streamingData, + DataChecksum: uint32(entry.DataChecksum), + } + + return dataEntry, nil } func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 4978ffe7eb..4a8e98d679 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -573,16 +573,19 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) { func readData(t *testing.T, reader DataFileSetReader) (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) { if reader.StreamingEnabled() { - id, encodedTags, data, checksum, err := reader.StreamingRead() + entry, err := reader.StreamingRead() + if err != nil { + return nil, nil, nil, 0, err + } var tags = ident.EmptyTagIterator - if len(encodedTags) > 0 { + if len(entry.EncodedTags) > 0 { tagsDecoder := testTagDecoderPool.Get() - tagsDecoder.Reset(checkedBytes(encodedTags)) + tagsDecoder.Reset(checkedBytes(entry.EncodedTags)) require.NoError(t, tagsDecoder.Err()) tags = tagsDecoder } - return id, tags, checked.NewBytes(data, nil), checksum, err + return entry.ID, tags, checked.NewBytes(entry.Data, nil), entry.DataChecksum, err } return reader.Read() diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index d359e74f27..84f1c72fb4 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -153,9 +153,9 @@ type DataFileSetReader interface { // StreamingRead returns the next unpooled id, encodedTags, data, checksum // values ordered by id, or error, will return io.EOF at end of volume. - // Can only by used when DataReaderOpenOptions.StreamingEnabled is enabled. - // Note: the returned id, encodedTags and data get invalidated on the next call to StreamingRead. - StreamingRead() (id ident.BytesID, encodedTags ts.EncodedTags, data []byte, checksum uint32, err error) + // Can only by used when DataReaderOpenOptions.StreamingEnabled is true. + // Note: the returned data gets invalidated on the next call to StreamingRead. + StreamingRead() (StreamedDataEntry, error) // ReadMetadata returns the next id and metadata or error, will return io.EOF at end of volume. // Use either Read or ReadMetadata to progress through a volume, but not both. @@ -216,7 +216,7 @@ type DataFileSetSeeker interface { // SeekIndexEntry returns the IndexEntry for the specified ID. This can be useful // ahead of issuing a number of seek requests so that the seek requests can be - // made in order. The returned IndexEntry can also be passed to SeekUsingIndexEntry + // made in order. The returned IndexEntry can also be passed to SeekByIndexEntry // to prevent duplicate index lookups. SeekIndexEntry(id ident.ID, resources ReusableSeekerResources) (IndexEntry, error) @@ -677,3 +677,23 @@ type IndexClaimsManager interface { blockStart time.Time, ) (int, error) } + +// StreamedDataEntry contains the data of single entry returned by streaming method(s). +// The underlying data slices are reused and invalidated on every read. +type StreamedDataEntry struct { + ID ident.BytesID + EncodedTags ts.EncodedTags + Data []byte + DataChecksum uint32 +} + +// NewReaderFn creates a new DataFileSetReader. +type NewReaderFn func(bytesPool pool.CheckedBytesPool, opts Options) (DataFileSetReader, error) + +// DataEntryProcessor processes StreamedDataEntries. +type DataEntryProcessor interface { + // SetEntriesCount sets the number of entries to be processed. + SetEntriesCount(int) + // ProcessEntry processes a single StreamedDataEntry. + ProcessEntry(StreamedDataEntry) error +} diff --git a/src/dbnode/persist/schema/index_entry_hasher.go b/src/dbnode/persist/schema/index_entry_hasher.go index f4ea40160c..d89fc9abf8 100644 --- a/src/dbnode/persist/schema/index_entry_hasher.go +++ b/src/dbnode/persist/schema/index_entry_hasher.go @@ -22,6 +22,9 @@ package schema import ( "github.com/cespare/xxhash/v2" + + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/ident" ) type xxHasher struct{} @@ -31,10 +34,15 @@ func NewXXHasher() IndexEntryHasher { return xxHasher{} } -func (h xxHasher) HashIndexEntry(e IndexEntry) int64 { +func (x xxHasher) HashIndexEntry( + id ident.BytesID, + encodedTags ts.EncodedTags, + dataChecksum int64, +) int64 { hash := uint64(7) - hash = 31*hash + xxhash.Sum64(e.ID) - hash = 31*hash + xxhash.Sum64(e.EncodedTags) - hash = 31*hash + uint64(e.DataChecksum) + hash = 31*hash + xxhash.Sum64(id) + hash = 31*hash + xxhash.Sum64(encodedTags) + hash = 31*hash + uint64(dataChecksum) + return int64(hash) } diff --git a/src/dbnode/persist/schema/index_entry_hasher_test.go b/src/dbnode/persist/schema/index_entry_hasher_test.go index 9d3acead0c..f3e4568c56 100644 --- a/src/dbnode/persist/schema/index_entry_hasher_test.go +++ b/src/dbnode/persist/schema/index_entry_hasher_test.go @@ -65,6 +65,7 @@ func TestIndexEntryHash(t *testing.T) { hasher := NewXXHasher() for _, tt := range tests { - assert.Equal(t, tt.expected, hasher.HashIndexEntry(tt.entry)) + e := tt.entry + assert.Equal(t, tt.expected, hasher.HashIndexEntry(e.ID, e.EncodedTags, e.DataChecksum)) } } diff --git a/src/dbnode/persist/schema/types.go b/src/dbnode/persist/schema/types.go index a13968818b..7a4ae7d273 100644 --- a/src/dbnode/persist/schema/types.go +++ b/src/dbnode/persist/schema/types.go @@ -22,6 +22,8 @@ package schema import ( "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/ident" ) // MajorVersion is the major schema version for a set of fileset files, @@ -90,9 +92,15 @@ type WideEntryFilter func(entry WideEntry) (bool, error) // IndexEntryHasher hashes an index entry. type IndexEntryHasher interface { - // HashIndexEntry computes a hash value for this IndexEntry using its ID, tags, + // HashIndexEntry computes a hash value for this index entry using its ID, tags, // and the computed data checksum. - HashIndexEntry(e IndexEntry) int64 + // NB: not passing the whole IndexEntry because of linter message: + // "hugeParam: e is heavy (88 bytes); consider passing it by pointer". + HashIndexEntry( + id ident.BytesID, + encodedTags ts.EncodedTags, + dataChecksum int64, + ) int64 } // IndexSummary stores a summary of an index entry to lookup. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 6e2f810449..2608d0bcf5 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -43,7 +43,6 @@ import ( "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" - "github.com/m3db/m3/src/x/pool" xtime "github.com/m3db/m3/src/x/time" "github.com/opentracing/opentracing-go" @@ -58,7 +57,6 @@ const ( type newIteratorFn func(opts commitlog.IteratorOpts) ( iter commitlog.Iterator, corruptFiles []commitlog.ErrorWithPath, err error) type snapshotFilesFn func(filePathPrefix string, namespace ident.ID, shard uint32) (fs.FileSetFilesSlice, error) -type newReaderFn func(bytesPool pool.CheckedBytesPool, opts fs.Options) (fs.DataFileSetReader, error) type commitLogSource struct { opts Options @@ -70,7 +68,7 @@ type commitLogSource struct { newIteratorFn newIteratorFn snapshotFilesFn snapshotFilesFn - newReaderFn newReaderFn + newReaderFn fs.NewReaderFn metrics commitLogSourceMetrics // Cache the results of reading the commit log between passes. The commit log is not sharded by time range, so the diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index c811175b59..c97d717708 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1386,7 +1386,7 @@ func (i *nsIndex) WideQuery( opts, ) - // NB: result should be fina.ized here, regardless of outcome + // NB: result should be finalized here, regardless of outcome // to prevent deadlocking while waiting on channel close. defer results.Finalize() queryOpts := opts.ToQueryOptions() diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index f8e39f7084..5b1d8e0034 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -174,6 +174,7 @@ type dbShard struct { filesetPathsBeforeFn filesetPathsBeforeFn deleteFilesFn deleteFilesFn snapshotFilesFn snapshotFilesFn + newReaderFn fs.NewReaderFn sleepFn func(time.Duration) identifierPool ident.Pool contextPool context.Pool @@ -317,6 +318,7 @@ func newDatabaseShard( deleteFilesFn: fs.DeleteFiles, snapshotFilesFn: fs.SnapshotFiles, sleepFn: time.Sleep, + newReaderFn: fs.NewReader, identifierPool: opts.IdentifierPool(), contextPool: opts.ContextPool(), flushState: newShardFlushState(), @@ -580,7 +582,7 @@ func iterateBatchSize(elemsLen int) int { if elemsLen < shardIterateBatchMinSize { return shardIterateBatchMinSize } - t := math.Ceil(float64(shardIterateBatchPercent) * float64(elemsLen)) + t := math.Ceil(shardIterateBatchPercent * float64(elemsLen)) return int(math.Max(shardIterateBatchMinSize, t)) } @@ -1941,7 +1943,7 @@ func (s *dbShard) FetchBlocksMetadataV2( if err != nil { return nil, nil, err } - return result, PageToken(data), nil + return result, data, nil } // Otherwise we move on to the previous block. @@ -2806,6 +2808,66 @@ func (s *dbShard) LatestVolume(blockStart time.Time) (int, error) { return s.namespaceReaderMgr.latestVolume(s.shard, blockStart) } +func (s *dbShard) ScanData( + blockStart time.Time, + processor fs.DataEntryProcessor, +) error { + latestVolume, err := s.LatestVolume(blockStart) + if err != nil { + return err + } + + reader, err := s.newReaderFn(s.opts.BytesPool(), s.opts.CommitLogOptions().FilesystemOptions()) + if err != nil { + return err + } + + openOpts := fs.DataReaderOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + Namespace: s.namespace.ID(), + Shard: s.ID(), + BlockStart: blockStart, + VolumeIndex: latestVolume, + }, + FileSetType: persist.FileSetFlushType, + StreamingEnabled: true, + } + + if err := reader.Open(openOpts); err != nil { + return err + } + + readEntriesErr := s.scanDataWithReader(reader, processor) + // Always close the reader regardless of if failed, but + // make sure to propagate if an error occurred closing the reader too. + readCloseErr := reader.Close() + if err := readEntriesErr; err != nil { + return readEntriesErr + } + return readCloseErr +} + +func (s *dbShard) scanDataWithReader( + reader fs.DataFileSetReader, + processor fs.DataEntryProcessor, +) error { + processor.SetEntriesCount(reader.Entries()) + + for { + entry, err := reader.StreamingRead() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + + if err := processor.ProcessEntry(entry); err != nil { + return err + } + } +} + func (s *dbShard) logFlushResult(r dbShardFlushResult) { s.logger.Debug("shard flush outcome", zap.Uint32("shard", s.ID()), diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index bceaf17151..8c0a237556 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -23,6 +23,7 @@ package storage import ( "errors" "fmt" + "io" "io/ioutil" "os" "strconv" @@ -1872,15 +1873,18 @@ func TestShardAggregateTiles(t *testing.T) { sourceShard := testDatabaseShard(t, testOpts) defer assert.NoError(t, sourceShard.Close()) - reader0, volume0 := getMockReader(ctrl, t, sourceShard, start, true) + reader0, volume0 := getMockReader( + ctrl, t, sourceShard, start, nil) reader0.EXPECT().Entries().Return(firstSourceBlockEntries) secondSourceBlockStart := start.Add(sourceBlockSize) - reader1, volume1 := getMockReader(ctrl, t, sourceShard, secondSourceBlockStart, true) + reader1, volume1 := getMockReader( + ctrl, t, sourceShard, secondSourceBlockStart, nil) reader1.EXPECT().Entries().Return(secondSourceBlockEntries) thirdSourceBlockStart := secondSourceBlockStart.Add(sourceBlockSize) - reader2, volume2 := getMockReader(ctrl, t, sourceShard, thirdSourceBlockStart, false) + reader2, volume2 := getMockReader( + ctrl, t, sourceShard, thirdSourceBlockStart, fs.ErrCheckpointFileNotFound) blockReaders := []fs.DataFileSetReader{reader0, reader1, reader2} sourceBlockVolumes := []shardBlockVolume{ @@ -1950,12 +1954,58 @@ func TestShardAggregateTilesVerifySliceLengths(t *testing.T) { require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)") } +func TestShardScan(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + var ( + blockSize = time.Hour + start = time.Now().Truncate(blockSize) + testOpts = DefaultTestOptions() + ) + + shard := testDatabaseShard(t, testOpts) + defer assert.NoError(t, shard.Close()) + + shardEntries := []fs.StreamedDataEntry{ + { + ID: ident.BytesID("id1"), + EncodedTags: ts.EncodedTags("tags1"), + Data: []byte{1}, + DataChecksum: 11, + }, + { + ID: ident.BytesID("id2"), + EncodedTags: ts.EncodedTags("tags2"), + Data: []byte{2}, + DataChecksum: 22, + }, + } + + processor := fs.NewMockDataEntryProcessor(ctrl) + processor.EXPECT().SetEntriesCount(len(shardEntries)) + + reader, _ := getMockReader(ctrl, t, shard, start, nil) + reader.EXPECT().Entries().Return(len(shardEntries)) + for _, entry := range shardEntries { + reader.EXPECT().StreamingRead().Return(entry, nil) + processor.EXPECT().ProcessEntry(entry) + } + reader.EXPECT().StreamingRead().Return(fs.StreamedDataEntry{}, io.EOF) + + shard.newReaderFn = func(pool.CheckedBytesPool, fs.Options) (fs.DataFileSetReader, error) { + return reader, nil + } + + require.NoError(t, shard.ScanData(start, processor)) +} + func getMockReader( ctrl *gomock.Controller, t *testing.T, shard *dbShard, blockStart time.Time, - dataFilesetFlushed bool, + openError error, ) (*fs.MockDataFileSetReader, int) { latestSourceVolume, err := shard.LatestVolume(blockStart) require.NoError(t, err) @@ -1972,11 +2022,11 @@ func getMockReader( } reader := fs.NewMockDataFileSetReader(ctrl) - if dataFilesetFlushed { + if openError == nil { reader.EXPECT().Open(openOpts).Return(nil) reader.EXPECT().Close() } else { - reader.EXPECT().Open(openOpts).Return(fs.ErrCheckpointFileNotFound) + reader.EXPECT().Open(openOpts).Return(openError) } return reader, latestSourceVolume diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 550fd18483..1cc099d797 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1917,6 +1917,20 @@ func (mr *MockShardMockRecorder) BootstrapState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapState", reflect.TypeOf((*MockShard)(nil).BootstrapState)) } +// ScanData mocks base method +func (m *MockShard) ScanData(blockStart time.Time, processor fs.DataEntryProcessor) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ScanData", blockStart, processor) + ret0, _ := ret[0].(error) + return ret0 +} + +// ScanData indicates an expected call of ScanData +func (mr *MockShardMockRecorder) ScanData(blockStart, processor interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanData", reflect.TypeOf((*MockShard)(nil).ScanData), blockStart, processor) +} + // MockdatabaseShard is a mock of databaseShard interface type MockdatabaseShard struct { ctrl *gomock.Controller @@ -1996,6 +2010,20 @@ func (mr *MockdatabaseShardMockRecorder) BootstrapState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapState", reflect.TypeOf((*MockdatabaseShard)(nil).BootstrapState)) } +// ScanData mocks base method +func (m *MockdatabaseShard) ScanData(blockStart time.Time, processor fs.DataEntryProcessor) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ScanData", blockStart, processor) + ret0, _ := ret[0].(error) + return ret0 +} + +// ScanData indicates an expected call of ScanData +func (mr *MockdatabaseShardMockRecorder) ScanData(blockStart, processor interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanData", reflect.TypeOf((*MockdatabaseShard)(nil).ScanData), blockStart, processor) +} + // OnEvictedFromWiredList mocks base method func (m *MockdatabaseShard) OnEvictedFromWiredList(id ident.ID, blockStart time.Time) { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index e27c25a658..d3aa8ef7a1 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -508,6 +508,13 @@ type Shard interface { // BootstrapState returns the shards' bootstrap state. BootstrapState() BootstrapState + + // ScanData performs a "full table scan" on the given block, + // calling processor function on every entry read. + ScanData( + blockStart time.Time, + processor fs.DataEntryProcessor, + ) error } type databaseShard interface { diff --git a/src/x/test/hash/hash.go b/src/x/test/hash/hash.go index f9ebe67aac..8a2cb397f2 100644 --- a/src/x/test/hash/hash.go +++ b/src/x/test/hash/hash.go @@ -26,6 +26,8 @@ import ( "testing" "github.com/m3db/m3/src/dbnode/persist/schema" + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/ident" "github.com/stretchr/testify/require" ) @@ -44,8 +46,12 @@ func NewParsedIndexHasher(t *testing.T) schema.IndexEntryHasher { return &parsedIndexHasher{t: t, re: re} } -func (h *parsedIndexHasher) HashIndexEntry(e schema.IndexEntry) int64 { - matched := h.re.FindAllString(string(e.ID), -1) +func (h *parsedIndexHasher) HashIndexEntry( + id ident.BytesID, + encodedTags ts.EncodedTags, + dataChecksum int64, +) int64 { + matched := h.re.FindAllString(string(id), -1) if len(matched) == 0 { return 0 }