-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Add OrderedByIndex option for DataFileSetReader.Open #2465
Changes from 4 commits
cf04c4b
c55c21c
0f41464
367551a
6a1912f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,8 @@ var ( | |
|
||
// errReadNotExpectedSize returned when the size of the next read does not match size specified by the index | ||
errReadNotExpectedSize = errors.New("next read not expected size") | ||
|
||
errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by Id") | ||
) | ||
|
||
const ( | ||
|
@@ -99,6 +101,8 @@ type reader struct { | |
shard uint32 | ||
volume int | ||
open bool | ||
|
||
orderedByIndex bool | ||
} | ||
|
||
// NewReader returns a new reader and expects all files to exist. Will read the | ||
|
@@ -151,6 +155,8 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { | |
dataFilepath string | ||
) | ||
|
||
r.orderedByIndex = opts.OrderedByIndex | ||
|
||
switch opts.FileSetType { | ||
case persist.FileSetSnapshotType: | ||
shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard) | ||
|
@@ -263,9 +269,13 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { | |
r.Close() | ||
return err | ||
} | ||
if err := r.readIndexAndSortByOffsetAsc(); err != nil { | ||
r.Close() | ||
return err | ||
if opts.OrderedByIndex { | ||
r.decoder.Reset(r.indexDecoderStream) | ||
} else { | ||
if err := r.readIndexAndSortByOffsetAsc(); err != nil { | ||
linasm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
r.Close() | ||
return err | ||
} | ||
} | ||
|
||
r.open = true | ||
|
@@ -282,7 +292,7 @@ func (r *reader) Status() DataFileSetReaderStatus { | |
Shard: r.shard, | ||
Volume: r.volume, | ||
BlockStart: r.start, | ||
BlockSize: time.Duration(r.blockSize), | ||
BlockSize: r.blockSize, | ||
} | ||
} | ||
|
||
|
@@ -329,6 +339,10 @@ func (r *reader) readInfo(size int) error { | |
} | ||
|
||
func (r *reader) readIndexAndSortByOffsetAsc() error { | ||
if r.orderedByIndex { | ||
return errUnexpectedSortByOffset | ||
} | ||
|
||
r.decoder.Reset(r.indexDecoderStream) | ||
for i := 0; i < r.entries; i++ { | ||
entry, err := r.decoder.DecodeIndexEntry(nil) | ||
|
@@ -344,6 +358,50 @@ func (r *reader) readIndexAndSortByOffsetAsc() error { | |
} | ||
|
||
func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { | ||
if r.orderedByIndex { | ||
return r.readInIndexedOrder() | ||
} | ||
return r.readInStoredOrder() | ||
} | ||
|
||
func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { | ||
if r.entriesRead >= r.entries { | ||
return nil, nil, nil, 0, io.EOF | ||
} | ||
|
||
entry, err := r.decoder.DecodeIndexEntry(nil) | ||
if err != nil { | ||
return nil, nil, nil, 0, err | ||
} | ||
|
||
var data checked.Bytes | ||
if r.bytesPool != nil { | ||
data = r.bytesPool.Get(int(entry.Size)) | ||
data.IncRef() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is done in the original read call but it looks like Although I guess if the API exposed to downstream caller is that where nit: could also call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe the |
||
defer data.DecRef() | ||
} else { | ||
data = checked.NewBytes(make([]byte, 0, entry.Size), nil) | ||
data.IncRef() | ||
defer data.DecRef() | ||
} | ||
|
||
data.AppendAll(r.dataMmap.Bytes[entry.Offset : entry.Offset+entry.Size]) | ||
|
||
// NB(r): _must_ check the checksum against known checksum as the data | ||
// file might not have been verified if we haven't read through the file yet. | ||
if entry.DataChecksum != int64(digest.Checksum(data.Bytes())) { | ||
return nil, nil, nil, 0, errSeekChecksumMismatch | ||
} | ||
|
||
id := r.entryClonedID(entry.ID) | ||
tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) | ||
|
||
r.entriesRead++ | ||
|
||
return id, tags, data, uint32(entry.DataChecksum), nil | ||
} | ||
|
||
func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { | ||
if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries { | ||
// Have not read the index yet, this is required when reading | ||
// data as we need each index entry in order by by the offset ascending | ||
|
@@ -386,6 +444,32 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err | |
} | ||
|
||
func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) { | ||
if r.orderedByIndex { | ||
return r.readMetadataInIndexedOrder() | ||
} | ||
return r.readMetadataInStoredOrder() | ||
} | ||
|
||
func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, uint32, error) { | ||
if r.entriesRead >= r.entries { | ||
return nil, nil, 0, 0, io.EOF | ||
} | ||
|
||
entry, err := r.decoder.DecodeIndexEntry(nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we're not pooling for the msg pack decoder currently but it may be worth pooling? @robskillington thoughts on maybe pooling here going fwd? |
||
if err != nil { | ||
return nil, nil, 0, 0, err | ||
} | ||
|
||
id := r.entryClonedID(entry.ID) | ||
tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) | ||
length := int(entry.Size) | ||
checksum := uint32(entry.DataChecksum) | ||
|
||
r.metadataRead++ | ||
return id, tags, length, checksum, nil | ||
} | ||
|
||
func (r *reader) readMetadataInStoredOrder() (ident.ID, ident.TagIterator, int, uint32, error) { | ||
if r.metadataRead >= r.entries { | ||
return nil, nil, 0, 0, io.EOF | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can remove this, I don't think this can ever hit in any current path? If we want to keep it for the sake of being extra defensive, we should have a similar error for calling
readInIndexedOrder
withorderedByIndex==false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember adding this check to ensure
readIndexAndSortByOffsetAsc
is not called fromOpen
when it is not needed. It is expensive operation, would be effectively a noop withOrderedByIndex=true
, and I cannot check whether it was called or not from tests.OTOH the choice between
readInIndexedOrder
vsreadInStoredOrder
is really explicit and colocated, I feel that such checks there would be excessive.