Skip to content

Commit

Permalink
[dbnode] Implement shard.ScanData (#2981)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Dec 7, 2020
1 parent 75ce6c3 commit 3175f2f
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter,DataEntryProcessor | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
Expand Down
62 changes: 54 additions & 8 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/dbnode/persist/fs/msgpack/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (dec *Decoder) decodeWideEntry(
var checksum int64
if compare == 0 {
// NB: need to compute hash before freeing entry bytes.
checksum = dec.hasher.HashIndexEntry(entry)
checksum = dec.hasher.HashIndexEntry(entry.ID, entry.EncodedTags, entry.DataChecksum)
return schema.WideEntry{
IndexEntry: entry,
MetadataChecksum: checksum,
Expand All @@ -522,7 +522,7 @@ func (dec *Decoder) decodeWideEntry(
return emptyWideEntry, MismatchLookupStatus
}

// compareID must have been before the curret entry.ID, so this
// compareID must have been before the current entry.ID, so this
// ID will not be matched.
return emptyWideEntry, NotFoundLookupStatus
}
Expand Down
22 changes: 14 additions & 8 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs/msgpack"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/checked"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -373,22 +372,22 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
return nil
}

func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32, error) {
func (r *reader) StreamingRead() (StreamedDataEntry, error) {
if !r.streamingEnabled {
return nil, nil, nil, 0, errStreamingRequired
return StreamedDataEntry{}, errStreamingRequired
}

if r.entriesRead >= r.entries {
return nil, nil, nil, 0, io.EOF
return StreamedDataEntry{}, io.EOF
}

entry, err := r.decoder.DecodeIndexEntry(nil)
if err != nil {
return nil, nil, nil, 0, err
return StreamedDataEntry{}, err
}

if entry.Offset+entry.Size > int64(len(r.dataMmap.Bytes)) {
return nil, nil, nil, 0, fmt.Errorf(
return StreamedDataEntry{}, fmt.Errorf(
"attempt to read beyond data file size (offset=%d, size=%d, file size=%d)",
entry.Offset, entry.Size, len(r.dataMmap.Bytes))
}
Expand All @@ -397,7 +396,7 @@ func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32,
// NB(r): _must_ check the checksum against known checksum as the data
// file might not have been verified if we haven't read through the file yet.
if entry.DataChecksum != int64(digest.Checksum(data)) {
return nil, nil, nil, 0, errSeekChecksumMismatch
return StreamedDataEntry{}, errSeekChecksumMismatch
}

r.streamingData = append(r.streamingData[:0], data...)
Expand All @@ -406,7 +405,14 @@ func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32,

r.entriesRead++

return r.streamingID, r.streamingTags, r.streamingData, uint32(entry.DataChecksum), nil
dataEntry := StreamedDataEntry{
ID: r.streamingID,
EncodedTags: r.streamingTags,
Data: r.streamingData,
DataChecksum: uint32(entry.DataChecksum),
}

return dataEntry, nil
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
Expand Down
11 changes: 7 additions & 4 deletions src/dbnode/persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,16 +573,19 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) {

func readData(t *testing.T, reader DataFileSetReader) (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) {
if reader.StreamingEnabled() {
id, encodedTags, data, checksum, err := reader.StreamingRead()
entry, err := reader.StreamingRead()
if err != nil {
return nil, nil, nil, 0, err
}
var tags = ident.EmptyTagIterator
if len(encodedTags) > 0 {
if len(entry.EncodedTags) > 0 {
tagsDecoder := testTagDecoderPool.Get()
tagsDecoder.Reset(checkedBytes(encodedTags))
tagsDecoder.Reset(checkedBytes(entry.EncodedTags))
require.NoError(t, tagsDecoder.Err())
tags = tagsDecoder
}

return id, tags, checked.NewBytes(data, nil), checksum, err
return entry.ID, tags, checked.NewBytes(entry.Data, nil), entry.DataChecksum, err
}

return reader.Read()
Expand Down
28 changes: 24 additions & 4 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ type DataFileSetReader interface {

// StreamingRead returns the next unpooled id, encodedTags, data, checksum
// values ordered by id, or error, will return io.EOF at end of volume.
// Can only by used when DataReaderOpenOptions.StreamingEnabled is enabled.
// Note: the returned id, encodedTags and data get invalidated on the next call to StreamingRead.
StreamingRead() (id ident.BytesID, encodedTags ts.EncodedTags, data []byte, checksum uint32, err error)
// Can only by used when DataReaderOpenOptions.StreamingEnabled is true.
// Note: the returned data gets invalidated on the next call to StreamingRead.
StreamingRead() (StreamedDataEntry, error)

// ReadMetadata returns the next id and metadata or error, will return io.EOF at end of volume.
// Use either Read or ReadMetadata to progress through a volume, but not both.
Expand Down Expand Up @@ -216,7 +216,7 @@ type DataFileSetSeeker interface {

// SeekIndexEntry returns the IndexEntry for the specified ID. This can be useful
// ahead of issuing a number of seek requests so that the seek requests can be
// made in order. The returned IndexEntry can also be passed to SeekUsingIndexEntry
// made in order. The returned IndexEntry can also be passed to SeekByIndexEntry
// to prevent duplicate index lookups.
SeekIndexEntry(id ident.ID, resources ReusableSeekerResources) (IndexEntry, error)

Expand Down Expand Up @@ -677,3 +677,23 @@ type IndexClaimsManager interface {
blockStart time.Time,
) (int, error)
}

// StreamedDataEntry contains the data of single entry returned by streaming method(s).
// The underlying data slices are reused and invalidated on every read.
type StreamedDataEntry struct {
ID ident.BytesID
EncodedTags ts.EncodedTags
Data []byte
DataChecksum uint32
}

// NewReaderFn creates a new DataFileSetReader.
type NewReaderFn func(bytesPool pool.CheckedBytesPool, opts Options) (DataFileSetReader, error)

// DataEntryProcessor processes StreamedDataEntries.
type DataEntryProcessor interface {
// SetEntriesCount sets the number of entries to be processed.
SetEntriesCount(int)
// ProcessEntry processes a single StreamedDataEntry.
ProcessEntry(StreamedDataEntry) error
}
16 changes: 12 additions & 4 deletions src/dbnode/persist/schema/index_entry_hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package schema

import (
"github.com/cespare/xxhash/v2"

"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/ident"
)

type xxHasher struct{}
Expand All @@ -31,10 +34,15 @@ func NewXXHasher() IndexEntryHasher {
return xxHasher{}
}

func (h xxHasher) HashIndexEntry(e IndexEntry) int64 {
func (x xxHasher) HashIndexEntry(
id ident.BytesID,
encodedTags ts.EncodedTags,
dataChecksum int64,
) int64 {
hash := uint64(7)
hash = 31*hash + xxhash.Sum64(e.ID)
hash = 31*hash + xxhash.Sum64(e.EncodedTags)
hash = 31*hash + uint64(e.DataChecksum)
hash = 31*hash + xxhash.Sum64(id)
hash = 31*hash + xxhash.Sum64(encodedTags)
hash = 31*hash + uint64(dataChecksum)

return int64(hash)
}
3 changes: 2 additions & 1 deletion src/dbnode/persist/schema/index_entry_hasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestIndexEntryHash(t *testing.T) {

hasher := NewXXHasher()
for _, tt := range tests {
assert.Equal(t, tt.expected, hasher.HashIndexEntry(tt.entry))
e := tt.entry
assert.Equal(t, tt.expected, hasher.HashIndexEntry(e.ID, e.EncodedTags, e.DataChecksum))
}
}
12 changes: 10 additions & 2 deletions src/dbnode/persist/schema/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package schema

import (
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/ident"
)

// MajorVersion is the major schema version for a set of fileset files,
Expand Down Expand Up @@ -90,9 +92,15 @@ type WideEntryFilter func(entry WideEntry) (bool, error)

// IndexEntryHasher hashes an index entry.
type IndexEntryHasher interface {
// HashIndexEntry computes a hash value for this IndexEntry using its ID, tags,
// HashIndexEntry computes a hash value for this index entry using its ID, tags,
// and the computed data checksum.
HashIndexEntry(e IndexEntry) int64
// NB: not passing the whole IndexEntry because of linter message:
// "hugeParam: e is heavy (88 bytes); consider passing it by pointer".
HashIndexEntry(
id ident.BytesID,
encodedTags ts.EncodedTags,
dataChecksum int64,
) int64
}

// IndexSummary stores a summary of an index entry to lookup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
Expand All @@ -58,7 +57,6 @@ const (
type newIteratorFn func(opts commitlog.IteratorOpts) (
iter commitlog.Iterator, corruptFiles []commitlog.ErrorWithPath, err error)
type snapshotFilesFn func(filePathPrefix string, namespace ident.ID, shard uint32) (fs.FileSetFilesSlice, error)
type newReaderFn func(bytesPool pool.CheckedBytesPool, opts fs.Options) (fs.DataFileSetReader, error)

type commitLogSource struct {
opts Options
Expand All @@ -70,7 +68,7 @@ type commitLogSource struct {

newIteratorFn newIteratorFn
snapshotFilesFn snapshotFilesFn
newReaderFn newReaderFn
newReaderFn fs.NewReaderFn

metrics commitLogSourceMetrics
// Cache the results of reading the commit log between passes. The commit log is not sharded by time range, so the
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ func (i *nsIndex) WideQuery(
opts,
)

// NB: result should be fina.ized here, regardless of outcome
// NB: result should be finalized here, regardless of outcome
// to prevent deadlocking while waiting on channel close.
defer results.Finalize()
queryOpts := opts.ToQueryOptions()
Expand Down
Loading

0 comments on commit 3175f2f

Please sign in to comment.