Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Add OrderedByIndex option for DataFileSetReader.Open #2465

Merged
merged 5 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/index_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *indexReader) ReadSegmentFileSet() (
)
success := false
defer func() {
// Do not close opened files if read finishes sucessfully.
// Do not close opened files if read finishes successfully.
if success {
return
}
Expand Down
86 changes: 84 additions & 2 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ var (

// errReadNotExpectedSize returned when the size of the next read does not match size specified by the index
errReadNotExpectedSize = errors.New("next read not expected size")

errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by Id")
Copy link
Collaborator

@arnikola arnikola Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can remove this, I don't think this can ever hit in any current path? If we want to keep it for the sake of being extra defensive, we should have a similar error for calling readInIndexedOrder with orderedByIndex==false

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember adding this check to ensure readIndexAndSortByOffsetAsc is not called from Open when it is not needed. It is expensive operation, would be effectively a noop with OrderedByIndex=true, and I cannot check whether it was called or not from tests.
OTOH the choice between readInIndexedOrder vs readInStoredOrder is really explicit and colocated, I feel that such checks there would be excessive.

)

const (
Expand Down Expand Up @@ -99,6 +101,8 @@ type reader struct {
shard uint32
volume int
open bool

orderedByIndex bool
}

// NewReader returns a new reader and expects all files to exist. Will read the
Expand Down Expand Up @@ -151,6 +155,8 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
dataFilepath string
)

r.orderedByIndex = opts.OrderedByIndex

switch opts.FileSetType {
case persist.FileSetSnapshotType:
shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard)
Expand Down Expand Up @@ -263,7 +269,9 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
r.Close()
return err
}
if err := r.readIndexAndSortByOffsetAsc(); err != nil {
if opts.OrderedByIndex {
r.decoder.Reset(r.indexDecoderStream)
} else if err := r.readIndexAndSortByOffsetAsc(); err != nil {
r.Close()
return err
}
Expand All @@ -282,7 +290,7 @@ func (r *reader) Status() DataFileSetReaderStatus {
Shard: r.shard,
Volume: r.volume,
BlockStart: r.start,
BlockSize: time.Duration(r.blockSize),
BlockSize: r.blockSize,
}
}

Expand Down Expand Up @@ -329,6 +337,10 @@ func (r *reader) readInfo(size int) error {
}

func (r *reader) readIndexAndSortByOffsetAsc() error {
if r.orderedByIndex {
return errUnexpectedSortByOffset
}

r.decoder.Reset(r.indexDecoderStream)
for i := 0; i < r.entries; i++ {
entry, err := r.decoder.DecodeIndexEntry(nil)
Expand All @@ -344,6 +356,50 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
if r.orderedByIndex {
return r.readInIndexedOrder()
}
return r.readInStoredOrder()
}

func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
if r.entriesRead >= r.entries {
return nil, nil, nil, 0, io.EOF
}

entry, err := r.decoder.DecodeIndexEntry(nil)
if err != nil {
return nil, nil, nil, 0, err
}

var data checked.Bytes
if r.bytesPool != nil {
data = r.bytesPool.Get(int(entry.Size))
data.IncRef()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is done in the original read call but it looks like data is returned at the end of this function. If we're transferring ownership of data shouldn't we wait for a downstream caller to DecRef() once it is done w/ the underlying bytes?

Although I guess if the API exposed to downstream caller is that where data is only valid for the duration of the Read() call it probably is not so bad? I'm not sure. @robskillington thoughts?

nit: could also call data.IncRef() below this conditional

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the data here is meant to mirror an iterator approach, where downstream calls Read, does whatever it needs to do with the entry, then calls Read again and invalidates the original

defer data.DecRef()
} else {
data = checked.NewBytes(make([]byte, 0, entry.Size), nil)
data.IncRef()
defer data.DecRef()
}

data.AppendAll(r.dataMmap.Bytes[entry.Offset : entry.Offset+entry.Size])

// NB(r): _must_ check the checksum against known checksum as the data
// file might not have been verified if we haven't read through the file yet.
if entry.DataChecksum != int64(digest.Checksum(data.Bytes())) {
return nil, nil, nil, 0, errSeekChecksumMismatch
}

id := r.entryClonedID(entry.ID)
tags := r.entryClonedEncodedTagsIter(entry.EncodedTags)

r.entriesRead++

return id, tags, data, uint32(entry.DataChecksum), nil
}

func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries {
// Have not read the index yet, this is required when reading
// data as we need each index entry in order by by the offset ascending
Expand Down Expand Up @@ -386,6 +442,32 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err
}

func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) {
if r.orderedByIndex {
return r.readMetadataInIndexedOrder()
}
return r.readMetadataInStoredOrder()
}

func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, uint32, error) {
if r.entriesRead >= r.entries {
return nil, nil, 0, 0, io.EOF
}

entry, err := r.decoder.DecodeIndexEntry(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're not pooling for the msg pack decoder currently but it may be worth pooling? @robskillington thoughts on maybe pooling here going fwd?

if err != nil {
return nil, nil, 0, 0, err
}

id := r.entryClonedID(entry.ID)
tags := r.entryClonedEncodedTagsIter(entry.EncodedTags)
length := int(entry.Size)
checksum := uint32(entry.DataChecksum)

r.metadataRead++
return id, tags, length, checksum, nil
}

func (r *reader) readMetadataInStoredOrder() (ident.ID, ident.TagIterator, int, uint32, error) {
if r.metadataRead >= r.entries {
return nil, nil, 0, 0, io.EOF
}
Expand Down
28 changes: 14 additions & 14 deletions src/dbnode/persist/fs/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestReadEmptyIndexUnreadData(t *testing.T) {
assert.NoError(t, err)

_, _, _, _, err = r.Read()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)

assert.NoError(t, r.Close())
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func testReadOpen(t *testing.T, fileData map[string][]byte) {
BlockSize: testBlockSize,
Identifier: FileSetFileIdentifier{
Namespace: testNs1ID,
Shard: uint32(shard),
Shard: shard,
BlockStart: start,
},
}
Expand Down Expand Up @@ -350,11 +350,11 @@ func TestReadOpenDigestOfDigestMismatch(t *testing.T) {
testReadOpen(
t,
map[string][]byte{
infoFileSuffix: []byte{0x1},
indexFileSuffix: []byte{0x2},
dataFileSuffix: []byte{0x3},
digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: []byte{0x12, 0x0, 0x7a, 0x0},
infoFileSuffix: {0x1},
indexFileSuffix: {0x2},
dataFileSuffix: {0x3},
digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: {0x12, 0x0, 0x7a, 0x0},
},
)
}
Expand All @@ -363,11 +363,11 @@ func TestReadOpenInfoDigestMismatch(t *testing.T) {
testReadOpen(
t,
map[string][]byte{
infoFileSuffix: []byte{0xa},
indexFileSuffix: []byte{0x2},
dataFileSuffix: []byte{0x3},
digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: []byte{0x13, 0x0, 0x7a, 0x0},
infoFileSuffix: {0xa},
indexFileSuffix: {0x2},
dataFileSuffix: {0x3},
digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: {0x13, 0x0, 0x7a, 0x0},
},
)
}
Expand All @@ -388,8 +388,8 @@ func TestReadOpenIndexDigestMismatch(t *testing.T) {
t,
map[string][]byte{
infoFileSuffix: b,
indexFileSuffix: []byte{0xa},
dataFileSuffix: []byte{0x3},
indexFileSuffix: {0xa},
dataFileSuffix: {0x3},
digestFileSuffix: digestOfDigest,
checkpointFileSuffix: buf,
},
Expand Down
36 changes: 34 additions & 2 deletions src/dbnode/persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ func (e testEntry) Tags() ident.Tags {
return tags
}

type testEntries []testEntry

func (e testEntries) Less(i, j int) bool {
return e[i].id < e[j].id
}

func (e testEntries) Len() int {
return len(e)
}

func (e testEntries) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

func newTestWriter(t *testing.T, filePathPrefix string) DataFileSetWriter {
writer, err := NewWriter(testDefaultOpts.
SetFilePathPrefix(filePathPrefix).
Expand Down Expand Up @@ -158,20 +172,37 @@ var readTestTypes = []readTestType{
readTestTypeMetadata,
}

func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) {
linasm marked this conversation as resolved.
Show resolved Hide resolved
readTestDataWithOrderOpt(t, r, shard, timestamp, entries, false)

sortedEntries := append(make(testEntries, 0, len(entries)), entries...)
sort.Sort(sortedEntries)

readTestDataWithOrderOpt(t, r, shard, timestamp, sortedEntries, true)
}

// readTestData will test reading back the data matches what was written,
// note that this test also tests reuse of the reader since it first reads
// all the data then closes it, reopens and reads through again but just
// reading the metadata the second time.
// If it starts to fail during the pass that reads just the metadata it could
// be a newly introduced reader reuse bug.
func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) {
func readTestDataWithOrderOpt(
t *testing.T,
r DataFileSetReader,
shard uint32,
timestamp time.Time,
entries []testEntry,
orderByIndex bool,
) {
for _, underTest := range readTestTypes {
rOpenOpts := DataReaderOpenOptions{
Identifier: FileSetFileIdentifier{
Namespace: testNs1ID,
Shard: 0,
Shard: shard,
BlockStart: timestamp,
},
OrderedByIndex: orderByIndex,
}
err := r.Open(rOpenOpts)
require.NoError(t, err)
Expand Down Expand Up @@ -220,6 +251,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim
tags.Close()
data.DecRef()
data.Finalize()

case readTestTypeMetadata:
id, tags, length, checksum, err := r.ReadMetadata()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/seek_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s
// 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker,
// and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine
// running the UpdateOpenlease() function that all inactive seekers have been returned and closed at
// which point the function will return sucessfully.
// which point the function will return successfully.
func (m *seekerManager) UpdateOpenLease(
descriptor block.LeaseDescriptor,
state block.LeaseState,
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ type DataFileSetReaderStatus struct {

// DataReaderOpenOptions is options struct for the reader open method.
type DataReaderOpenOptions struct {
Identifier FileSetFileIdentifier
FileSetType persist.FileSetType
Identifier FileSetFileIdentifier
FileSetType persist.FileSetType
// OrderedByIndex enforces reading of series in the order of index (which is by series Id).
OrderedByIndex bool
linasm marked this conversation as resolved.
Show resolved Hide resolved
}

// DataFileSetReader provides an unsynchronized reader for a TSDB file set
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error {
// We go through this error checking process to allow for partially successful flushes.
indexColdFlushError := onColdFlushNs.Done()
if indexColdFlushError == nil && onColdFlushDone != nil {
// Only evict rotated cold mutable index segments if the index cold flush was sucessful
// Only evict rotated cold mutable index segments if the index cold flush was successful
// or we will lose queryability of data that's still in mem.
indexColdFlushError = onColdFlushDone()
}
Expand Down