diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index 88712236dc..773c7b5291 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -207,7 +207,7 @@ func (r *indexReader) ReadSegmentFileSet() ( ) success := false defer func() { - // Do not close opened files if read finishes sucessfully. + // Do not close opened files if read finishes successfully. if success { return } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 9805f06170..7c55277ad9 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -50,6 +50,8 @@ var ( // errReadNotExpectedSize returned when the size of the next read does not match size specified by the index errReadNotExpectedSize = errors.New("next read not expected size") + + errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by Id") ) const ( @@ -99,6 +101,8 @@ type reader struct { shard uint32 volume int open bool + + orderedByIndex bool } // NewReader returns a new reader and expects all files to exist. Will read the @@ -151,6 +155,8 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { dataFilepath string ) + r.orderedByIndex = opts.OrderedByIndex + switch opts.FileSetType { case persist.FileSetSnapshotType: shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard) @@ -263,7 +269,9 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { r.Close() return err } - if err := r.readIndexAndSortByOffsetAsc(); err != nil { + if opts.OrderedByIndex { + r.decoder.Reset(r.indexDecoderStream) + } else if err := r.readIndexAndSortByOffsetAsc(); err != nil { r.Close() return err } @@ -282,7 +290,7 @@ func (r *reader) Status() DataFileSetReaderStatus { Shard: r.shard, Volume: r.volume, BlockStart: r.start, - BlockSize: time.Duration(r.blockSize), + BlockSize: r.blockSize, } } @@ -329,6 +337,10 @@ func (r *reader) readInfo(size int) error { } func (r *reader) readIndexAndSortByOffsetAsc() error { + if r.orderedByIndex { + return errUnexpectedSortByOffset + } + r.decoder.Reset(r.indexDecoderStream) for i := 0; i < r.entries; i++ { entry, err := r.decoder.DecodeIndexEntry(nil) @@ -344,6 +356,50 @@ func (r *reader) readIndexAndSortByOffsetAsc() error { } func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.orderedByIndex { + return r.readInIndexedOrder() + } + return r.readInStoredOrder() +} + +func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, nil, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, nil, 0, err + } + + var data checked.Bytes + if r.bytesPool != nil { + data = r.bytesPool.Get(int(entry.Size)) + data.IncRef() + defer data.DecRef() + } else { + data = checked.NewBytes(make([]byte, 0, entry.Size), nil) + data.IncRef() + defer data.DecRef() + } + + data.AppendAll(r.dataMmap.Bytes[entry.Offset : entry.Offset+entry.Size]) + + // NB(r): _must_ check the checksum against known checksum as the data + // file might not have been verified if we haven't read through the file yet. + if entry.DataChecksum != int64(digest.Checksum(data.Bytes())) { + return nil, nil, nil, 0, errSeekChecksumMismatch + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + + r.entriesRead++ + + return id, tags, data, uint32(entry.DataChecksum), nil +} + +func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries { // Have not read the index yet, this is required when reading // data as we need each index entry in order by by the offset ascending @@ -386,6 +442,32 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err } func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.orderedByIndex { + return r.readMetadataInIndexedOrder() + } + return r.readMetadataInStoredOrder() +} + +func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, 0, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, 0, 0, err + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + length := int(entry.Size) + checksum := uint32(entry.DataChecksum) + + r.metadataRead++ + return id, tags, length, checksum, nil +} + +func (r *reader) readMetadataInStoredOrder() (ident.ID, ident.TagIterator, int, uint32, error) { if r.metadataRead >= r.entries { return nil, nil, 0, 0, io.EOF } diff --git a/src/dbnode/persist/fs/read_test.go b/src/dbnode/persist/fs/read_test.go index 5f6c3ca6aa..15eb299eaa 100644 --- a/src/dbnode/persist/fs/read_test.go +++ b/src/dbnode/persist/fs/read_test.go @@ -144,7 +144,7 @@ func TestReadEmptyIndexUnreadData(t *testing.T) { assert.NoError(t, err) _, _, _, _, err = r.Read() - assert.Error(t, err) + assert.Equal(t, io.EOF, err) assert.NoError(t, r.Close()) } @@ -311,7 +311,7 @@ func testReadOpen(t *testing.T, fileData map[string][]byte) { BlockSize: testBlockSize, Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: uint32(shard), + Shard: shard, BlockStart: start, }, } @@ -350,11 +350,11 @@ func TestReadOpenDigestOfDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0x1}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x12, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0x1}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x12, 0x0, 0x7a, 0x0}, }, ) } @@ -363,11 +363,11 @@ func TestReadOpenInfoDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0xa}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x13, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0xa}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x13, 0x0, 0x7a, 0x0}, }, ) } @@ -388,8 +388,8 @@ func TestReadOpenIndexDigestMismatch(t *testing.T) { t, map[string][]byte{ infoFileSuffix: b, - indexFileSuffix: []byte{0xa}, - dataFileSuffix: []byte{0x3}, + indexFileSuffix: {0xa}, + dataFileSuffix: {0x3}, digestFileSuffix: digestOfDigest, checkpointFileSuffix: buf, }, diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 521f9df19f..630167e29e 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -75,6 +75,20 @@ func (e testEntry) Tags() ident.Tags { return tags } +type testEntries []testEntry + +func (e testEntries) Less(i, j int) bool { + return e[i].id < e[j].id +} + +func (e testEntries) Len() int { + return len(e) +} + +func (e testEntries) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} + func newTestWriter(t *testing.T, filePathPrefix string) DataFileSetWriter { writer, err := NewWriter(testDefaultOpts. SetFilePathPrefix(filePathPrefix). @@ -158,20 +172,37 @@ var readTestTypes = []readTestType{ readTestTypeMetadata, } +func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { + readTestDataWithOrderOpt(t, r, shard, timestamp, entries, false) + + sortedEntries := append(make(testEntries, 0, len(entries)), entries...) + sort.Sort(sortedEntries) + + readTestDataWithOrderOpt(t, r, shard, timestamp, sortedEntries, true) +} + // readTestData will test reading back the data matches what was written, // note that this test also tests reuse of the reader since it first reads // all the data then closes it, reopens and reads through again but just // reading the metadata the second time. // If it starts to fail during the pass that reads just the metadata it could // be a newly introduced reader reuse bug. -func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { +func readTestDataWithOrderOpt( + t *testing.T, + r DataFileSetReader, + shard uint32, + timestamp time.Time, + entries []testEntry, + orderByIndex bool, +) { for _, underTest := range readTestTypes { rOpenOpts := DataReaderOpenOptions{ Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: 0, + Shard: shard, BlockStart: timestamp, }, + OrderedByIndex: orderByIndex, } err := r.Open(rOpenOpts) require.NoError(t, err) @@ -220,6 +251,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim tags.Close() data.DecRef() data.Finalize() + case readTestTypeMetadata: id, tags, length, checksum, err := r.ReadMetadata() require.NoError(t, err) diff --git a/src/dbnode/persist/fs/seek_manager.go b/src/dbnode/persist/fs/seek_manager.go index 0e8db86594..049eb11c73 100644 --- a/src/dbnode/persist/fs/seek_manager.go +++ b/src/dbnode/persist/fs/seek_manager.go @@ -467,7 +467,7 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s // 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker, // and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine // running the UpdateOpenlease() function that all inactive seekers have been returned and closed at -// which point the function will return sucessfully. +// which point the function will return successfully. func (m *seekerManager) UpdateOpenLease( descriptor block.LeaseDescriptor, state block.LeaseState, diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 7c15b6d56e..6c923deaa0 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -120,8 +120,10 @@ type DataFileSetReaderStatus struct { // DataReaderOpenOptions is options struct for the reader open method. type DataReaderOpenOptions struct { - Identifier FileSetFileIdentifier - FileSetType persist.FileSetType + Identifier FileSetFileIdentifier + FileSetType persist.FileSetType + // OrderedByIndex enforces reading of series in the order of index (which is by series Id). + OrderedByIndex bool } // DataFileSetReader provides an unsynchronized reader for a TSDB file set diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 47de4c9d35..0f145ba574 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1229,7 +1229,7 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error { // We go through this error checking process to allow for partially successful flushes. indexColdFlushError := onColdFlushNs.Done() if indexColdFlushError == nil && onColdFlushDone != nil { - // Only evict rotated cold mutable index segments if the index cold flush was sucessful + // Only evict rotated cold mutable index segments if the index cold flush was successful // or we will lose queryability of data that's still in mem. indexColdFlushError = onColdFlushDone() }