Skip to content

Commit

Permalink
[dbnode] Add OrderedByIndex option for DataFileSetReader.Open (#2465)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Aug 11, 2020
1 parent aad2375 commit f9c465d
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/index_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *indexReader) ReadSegmentFileSet() (
)
success := false
defer func() {
// Do not close opened files if read finishes sucessfully.
// Do not close opened files if read finishes successfully.
if success {
return
}
Expand Down
86 changes: 84 additions & 2 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ var (

// errReadNotExpectedSize returned when the size of the next read does not match size specified by the index
errReadNotExpectedSize = errors.New("next read not expected size")

errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by Id")
)

const (
Expand Down Expand Up @@ -99,6 +101,8 @@ type reader struct {
shard uint32
volume int
open bool

orderedByIndex bool
}

// NewReader returns a new reader and expects all files to exist. Will read the
Expand Down Expand Up @@ -151,6 +155,8 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
dataFilepath string
)

r.orderedByIndex = opts.OrderedByIndex

switch opts.FileSetType {
case persist.FileSetSnapshotType:
shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard)
Expand Down Expand Up @@ -263,7 +269,9 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
r.Close()
return err
}
if err := r.readIndexAndSortByOffsetAsc(); err != nil {
if opts.OrderedByIndex {
r.decoder.Reset(r.indexDecoderStream)
} else if err := r.readIndexAndSortByOffsetAsc(); err != nil {
r.Close()
return err
}
Expand All @@ -282,7 +290,7 @@ func (r *reader) Status() DataFileSetReaderStatus {
Shard: r.shard,
Volume: r.volume,
BlockStart: r.start,
BlockSize: time.Duration(r.blockSize),
BlockSize: r.blockSize,
}
}

Expand Down Expand Up @@ -329,6 +337,10 @@ func (r *reader) readInfo(size int) error {
}

func (r *reader) readIndexAndSortByOffsetAsc() error {
if r.orderedByIndex {
return errUnexpectedSortByOffset
}

r.decoder.Reset(r.indexDecoderStream)
for i := 0; i < r.entries; i++ {
entry, err := r.decoder.DecodeIndexEntry(nil)
Expand All @@ -344,6 +356,50 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
if r.orderedByIndex {
return r.readInIndexedOrder()
}
return r.readInStoredOrder()
}

func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
if r.entriesRead >= r.entries {
return nil, nil, nil, 0, io.EOF
}

entry, err := r.decoder.DecodeIndexEntry(nil)
if err != nil {
return nil, nil, nil, 0, err
}

var data checked.Bytes
if r.bytesPool != nil {
data = r.bytesPool.Get(int(entry.Size))
data.IncRef()
defer data.DecRef()
} else {
data = checked.NewBytes(make([]byte, 0, entry.Size), nil)
data.IncRef()
defer data.DecRef()
}

data.AppendAll(r.dataMmap.Bytes[entry.Offset : entry.Offset+entry.Size])

// NB(r): _must_ check the checksum against known checksum as the data
// file might not have been verified if we haven't read through the file yet.
if entry.DataChecksum != int64(digest.Checksum(data.Bytes())) {
return nil, nil, nil, 0, errSeekChecksumMismatch
}

id := r.entryClonedID(entry.ID)
tags := r.entryClonedEncodedTagsIter(entry.EncodedTags)

r.entriesRead++

return id, tags, data, uint32(entry.DataChecksum), nil
}

func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries {
// Have not read the index yet, this is required when reading
// data as we need each index entry in order by by the offset ascending
Expand Down Expand Up @@ -386,6 +442,32 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err
}

func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) {
if r.orderedByIndex {
return r.readMetadataInIndexedOrder()
}
return r.readMetadataInStoredOrder()
}

func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, uint32, error) {
if r.entriesRead >= r.entries {
return nil, nil, 0, 0, io.EOF
}

entry, err := r.decoder.DecodeIndexEntry(nil)
if err != nil {
return nil, nil, 0, 0, err
}

id := r.entryClonedID(entry.ID)
tags := r.entryClonedEncodedTagsIter(entry.EncodedTags)
length := int(entry.Size)
checksum := uint32(entry.DataChecksum)

r.metadataRead++
return id, tags, length, checksum, nil
}

func (r *reader) readMetadataInStoredOrder() (ident.ID, ident.TagIterator, int, uint32, error) {
if r.metadataRead >= r.entries {
return nil, nil, 0, 0, io.EOF
}
Expand Down
28 changes: 14 additions & 14 deletions src/dbnode/persist/fs/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestReadEmptyIndexUnreadData(t *testing.T) {
assert.NoError(t, err)

_, _, _, _, err = r.Read()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)

assert.NoError(t, r.Close())
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func testReadOpen(t *testing.T, fileData map[string][]byte) {
BlockSize: testBlockSize,
Identifier: FileSetFileIdentifier{
Namespace: testNs1ID,
Shard: uint32(shard),
Shard: shard,
BlockStart: start,
},
}
Expand Down Expand Up @@ -350,11 +350,11 @@ func TestReadOpenDigestOfDigestMismatch(t *testing.T) {
testReadOpen(
t,
map[string][]byte{
infoFileSuffix: []byte{0x1},
indexFileSuffix: []byte{0x2},
dataFileSuffix: []byte{0x3},
digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: []byte{0x12, 0x0, 0x7a, 0x0},
infoFileSuffix: {0x1},
indexFileSuffix: {0x2},
dataFileSuffix: {0x3},
digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: {0x12, 0x0, 0x7a, 0x0},
},
)
}
Expand All @@ -363,11 +363,11 @@ func TestReadOpenInfoDigestMismatch(t *testing.T) {
testReadOpen(
t,
map[string][]byte{
infoFileSuffix: []byte{0xa},
indexFileSuffix: []byte{0x2},
dataFileSuffix: []byte{0x3},
digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: []byte{0x13, 0x0, 0x7a, 0x0},
infoFileSuffix: {0xa},
indexFileSuffix: {0x2},
dataFileSuffix: {0x3},
digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0},
checkpointFileSuffix: {0x13, 0x0, 0x7a, 0x0},
},
)
}
Expand All @@ -388,8 +388,8 @@ func TestReadOpenIndexDigestMismatch(t *testing.T) {
t,
map[string][]byte{
infoFileSuffix: b,
indexFileSuffix: []byte{0xa},
dataFileSuffix: []byte{0x3},
indexFileSuffix: {0xa},
dataFileSuffix: {0x3},
digestFileSuffix: digestOfDigest,
checkpointFileSuffix: buf,
},
Expand Down
36 changes: 34 additions & 2 deletions src/dbnode/persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ func (e testEntry) Tags() ident.Tags {
return tags
}

type testEntries []testEntry

func (e testEntries) Less(i, j int) bool {
return e[i].id < e[j].id
}

func (e testEntries) Len() int {
return len(e)
}

func (e testEntries) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

func newTestWriter(t *testing.T, filePathPrefix string) DataFileSetWriter {
writer, err := NewWriter(testDefaultOpts.
SetFilePathPrefix(filePathPrefix).
Expand Down Expand Up @@ -158,20 +172,37 @@ var readTestTypes = []readTestType{
readTestTypeMetadata,
}

func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) {
readTestDataWithOrderOpt(t, r, shard, timestamp, entries, false)

sortedEntries := append(make(testEntries, 0, len(entries)), entries...)
sort.Sort(sortedEntries)

readTestDataWithOrderOpt(t, r, shard, timestamp, sortedEntries, true)
}

// readTestData will test reading back the data matches what was written,
// note that this test also tests reuse of the reader since it first reads
// all the data then closes it, reopens and reads through again but just
// reading the metadata the second time.
// If it starts to fail during the pass that reads just the metadata it could
// be a newly introduced reader reuse bug.
func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) {
func readTestDataWithOrderOpt(
t *testing.T,
r DataFileSetReader,
shard uint32,
timestamp time.Time,
entries []testEntry,
orderByIndex bool,
) {
for _, underTest := range readTestTypes {
rOpenOpts := DataReaderOpenOptions{
Identifier: FileSetFileIdentifier{
Namespace: testNs1ID,
Shard: 0,
Shard: shard,
BlockStart: timestamp,
},
OrderedByIndex: orderByIndex,
}
err := r.Open(rOpenOpts)
require.NoError(t, err)
Expand Down Expand Up @@ -220,6 +251,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim
tags.Close()
data.DecRef()
data.Finalize()

case readTestTypeMetadata:
id, tags, length, checksum, err := r.ReadMetadata()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/seek_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s
// 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker,
// and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine
// running the UpdateOpenlease() function that all inactive seekers have been returned and closed at
// which point the function will return sucessfully.
// which point the function will return successfully.
func (m *seekerManager) UpdateOpenLease(
descriptor block.LeaseDescriptor,
state block.LeaseState,
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ type DataFileSetReaderStatus struct {

// DataReaderOpenOptions is options struct for the reader open method.
type DataReaderOpenOptions struct {
Identifier FileSetFileIdentifier
FileSetType persist.FileSetType
Identifier FileSetFileIdentifier
FileSetType persist.FileSetType
// OrderedByIndex enforces reading of series in the order of index (which is by series Id).
OrderedByIndex bool
}

// DataFileSetReader provides an unsynchronized reader for a TSDB file set
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error {
// We go through this error checking process to allow for partially successful flushes.
indexColdFlushError := onColdFlushNs.Done()
if indexColdFlushError == nil && onColdFlushDone != nil {
// Only evict rotated cold mutable index segments if the index cold flush was sucessful
// Only evict rotated cold mutable index segments if the index cold flush was successful
// or we will lose queryability of data that's still in mem.
indexColdFlushError = onColdFlushDone()
}
Expand Down

0 comments on commit f9c465d

Please sign in to comment.