From cf04c4b5f87411c8af7d9fcc2929a2b5d96cc6e0 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 14 Jul 2020 10:00:15 +0300 Subject: [PATCH 01/23] [dbnode] Add OrderedByIndex option for DataFileSetReader.Open --- src/dbnode/persist/fs/index_read.go | 2 +- src/dbnode/persist/fs/read.go | 92 ++++++++++++++++++++++-- src/dbnode/persist/fs/read_test.go | 28 ++++---- src/dbnode/persist/fs/read_write_test.go | 36 +++++++++- src/dbnode/persist/fs/seek_manager.go | 2 +- src/dbnode/persist/fs/types.go | 6 +- src/dbnode/storage/namespace.go | 2 +- 7 files changed, 143 insertions(+), 25 deletions(-) diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index f86c2d9c1c..deda69685f 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -207,7 +207,7 @@ func (r *indexReader) ReadSegmentFileSet() ( ) success := false defer func() { - // Do not close opened files if read finishes sucessfully. + // Do not close opened files if read finishes successfully. if success { return } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 274bd7c255..890fc7ea88 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -50,6 +50,8 @@ var ( // errReadNotExpectedSize returned when the size of the next read does not match size specified by the index errReadNotExpectedSize = errors.New("next read not expected size") + + errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by Id") ) const ( @@ -99,6 +101,8 @@ type reader struct { shard uint32 volume int open bool + + orderedByIndex bool } // NewReader returns a new reader and expects all files to exist. Will read the @@ -151,6 +155,8 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { dataFilepath string ) + r.orderedByIndex = opts.OrderedByIndex + switch opts.FileSetType { case persist.FileSetSnapshotType: shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard) @@ -263,9 +269,13 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { r.Close() return err } - if err := r.readIndexAndSortByOffsetAsc(); err != nil { - r.Close() - return err + if opts.OrderedByIndex { + r.decoder.Reset(r.indexDecoderStream) + } else { + if err := r.readIndexAndSortByOffsetAsc(); err != nil { + r.Close() + return err + } } r.open = true @@ -282,7 +292,7 @@ func (r *reader) Status() DataFileSetReaderStatus { Shard: r.shard, Volume: r.volume, BlockStart: r.start, - BlockSize: time.Duration(r.blockSize), + BlockSize: r.blockSize, } } @@ -329,6 +339,10 @@ func (r *reader) readInfo(size int) error { } func (r *reader) readIndexAndSortByOffsetAsc() error { + if r.orderedByIndex { + return errUnexpectedSortByOffset + } + r.decoder.Reset(r.indexDecoderStream) for i := 0; i < r.entries; i++ { entry, err := r.decoder.DecodeIndexEntry(nil) @@ -344,6 +358,50 @@ func (r *reader) readIndexAndSortByOffsetAsc() error { } func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.orderedByIndex { + return r.readInIndexedOrder() + } + return r.readInStoredOrder() +} + +func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, nil, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, nil, 0, err + } + + var data checked.Bytes + if r.bytesPool != nil { + data = r.bytesPool.Get(int(entry.Size)) + data.IncRef() + defer data.DecRef() + } else { + data = checked.NewBytes(make([]byte, 0, entry.Size), nil) + data.IncRef() + defer data.DecRef() + } + + data.AppendAll(r.dataMmap.Bytes[entry.Offset : entry.Offset+entry.Size]) + + // NB(r): _must_ check the checksum against known checksum as the data + // file might not have been verified if we haven't read through the file yet. + if entry.Checksum != int64(digest.Checksum(data.Bytes())) { + return nil, nil, nil, 0, errSeekChecksumMismatch + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + + r.entriesRead++ + + return id, tags, data, uint32(entry.Checksum), nil +} + +func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries { // Have not read the index yet, this is required when reading // data as we need each index entry in order by by the offset ascending @@ -386,6 +444,32 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err } func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.orderedByIndex { + return r.readMetadataInIndexedOrder() + } + return r.readMetadataInStoredOrder() +} + +func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, 0, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, 0, 0, err + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + length := int(entry.Size) + checksum := uint32(entry.Checksum) + + r.metadataRead++ + return id, tags, length, checksum, nil +} + +func (r *reader) readMetadataInStoredOrder() (ident.ID, ident.TagIterator, int, uint32, error) { if r.metadataRead >= r.entries { return nil, nil, 0, 0, io.EOF } diff --git a/src/dbnode/persist/fs/read_test.go b/src/dbnode/persist/fs/read_test.go index 5f6c3ca6aa..15eb299eaa 100644 --- a/src/dbnode/persist/fs/read_test.go +++ b/src/dbnode/persist/fs/read_test.go @@ -144,7 +144,7 @@ func TestReadEmptyIndexUnreadData(t *testing.T) { assert.NoError(t, err) _, _, _, _, err = r.Read() - assert.Error(t, err) + assert.Equal(t, io.EOF, err) assert.NoError(t, r.Close()) } @@ -311,7 +311,7 @@ func testReadOpen(t *testing.T, fileData map[string][]byte) { BlockSize: testBlockSize, Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: uint32(shard), + Shard: shard, BlockStart: start, }, } @@ -350,11 +350,11 @@ func TestReadOpenDigestOfDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0x1}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x12, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0x1}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x12, 0x0, 0x7a, 0x0}, }, ) } @@ -363,11 +363,11 @@ func TestReadOpenInfoDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0xa}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x13, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0xa}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x13, 0x0, 0x7a, 0x0}, }, ) } @@ -388,8 +388,8 @@ func TestReadOpenIndexDigestMismatch(t *testing.T) { t, map[string][]byte{ infoFileSuffix: b, - indexFileSuffix: []byte{0xa}, - dataFileSuffix: []byte{0x3}, + indexFileSuffix: {0xa}, + dataFileSuffix: {0x3}, digestFileSuffix: digestOfDigest, checkpointFileSuffix: buf, }, diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 521f9df19f..630167e29e 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -75,6 +75,20 @@ func (e testEntry) Tags() ident.Tags { return tags } +type testEntries []testEntry + +func (e testEntries) Less(i, j int) bool { + return e[i].id < e[j].id +} + +func (e testEntries) Len() int { + return len(e) +} + +func (e testEntries) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} + func newTestWriter(t *testing.T, filePathPrefix string) DataFileSetWriter { writer, err := NewWriter(testDefaultOpts. SetFilePathPrefix(filePathPrefix). @@ -158,20 +172,37 @@ var readTestTypes = []readTestType{ readTestTypeMetadata, } +func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { + readTestDataWithOrderOpt(t, r, shard, timestamp, entries, false) + + sortedEntries := append(make(testEntries, 0, len(entries)), entries...) + sort.Sort(sortedEntries) + + readTestDataWithOrderOpt(t, r, shard, timestamp, sortedEntries, true) +} + // readTestData will test reading back the data matches what was written, // note that this test also tests reuse of the reader since it first reads // all the data then closes it, reopens and reads through again but just // reading the metadata the second time. // If it starts to fail during the pass that reads just the metadata it could // be a newly introduced reader reuse bug. -func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { +func readTestDataWithOrderOpt( + t *testing.T, + r DataFileSetReader, + shard uint32, + timestamp time.Time, + entries []testEntry, + orderByIndex bool, +) { for _, underTest := range readTestTypes { rOpenOpts := DataReaderOpenOptions{ Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: 0, + Shard: shard, BlockStart: timestamp, }, + OrderedByIndex: orderByIndex, } err := r.Open(rOpenOpts) require.NoError(t, err) @@ -220,6 +251,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim tags.Close() data.DecRef() data.Finalize() + case readTestTypeMetadata: id, tags, length, checksum, err := r.ReadMetadata() require.NoError(t, err) diff --git a/src/dbnode/persist/fs/seek_manager.go b/src/dbnode/persist/fs/seek_manager.go index 0e8db86594..049eb11c73 100644 --- a/src/dbnode/persist/fs/seek_manager.go +++ b/src/dbnode/persist/fs/seek_manager.go @@ -467,7 +467,7 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s // 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker, // and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine // running the UpdateOpenlease() function that all inactive seekers have been returned and closed at -// which point the function will return sucessfully. +// which point the function will return successfully. func (m *seekerManager) UpdateOpenLease( descriptor block.LeaseDescriptor, state block.LeaseState, diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index cded4b51dc..b8fc7866d0 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -120,8 +120,10 @@ type DataFileSetReaderStatus struct { // DataReaderOpenOptions is options struct for the reader open method. type DataReaderOpenOptions struct { - Identifier FileSetFileIdentifier - FileSetType persist.FileSetType + Identifier FileSetFileIdentifier + FileSetType persist.FileSetType + // OrderedByIndex enforces reading of series in the order of index (which is by series Id). + OrderedByIndex bool } // DataFileSetReader provides an unsynchronized reader for a TSDB file set diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index e03c9cc6d5..11426e5cb2 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1226,7 +1226,7 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error { // We go through this error checking process to allow for partially successful flushes. indexColdFlushError := onColdFlushNs.Done() if indexColdFlushError == nil && onColdFlushDone != nil { - // Only evict rotated cold mutable index segments if the index cold flush was sucessful + // Only evict rotated cold mutable index segments if the index cold flush was successful // or we will lose queryability of data that's still in mem. indexColdFlushError = onColdFlushDone() } From 76f53797c22e797da4669ab8494f99f4912b1f43 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Wed, 22 Jul 2020 16:15:05 +0300 Subject: [PATCH 02/23] [dbnode] Cross block series reader --- src/dbnode/persist/fs/cross_block_reader.go | 156 ++++++++++++++++++++ src/dbnode/persist/fs/types.go | 12 ++ 2 files changed, 168 insertions(+) create mode 100644 src/dbnode/persist/fs/cross_block_reader.go diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go new file mode 100644 index 0000000000..7503ed9006 --- /dev/null +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -0,0 +1,156 @@ +package fs + +import ( + "bytes" + "container/heap" + "fmt" + "github.com/m3db/m3/src/x/checked" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/ident" + "io" + "time" +) + +var _ CrossBlockReader = (*crossBlockReader)(nil) + +type crossBlockReader struct { + dataFileSetReaders []DataFileSetReader + initialized bool + minHeap minHeap + err error +} + +type minHeapEntry struct { + dataFileSetReaderIndex int + id ident.ID + tags ident.TagIterator + data checked.Bytes + checksum uint32 +} + +var _ heap.Interface = (*minHeap)(nil) + +type minHeap []*minHeapEntry + +// NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders. +// Callers are responsible for closing the DataFileSetReaders. +func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (*crossBlockReader, error) { + var previousStart time.Time + for _, dataFileSetReader := range dataFileSetReaders { + currentStart := dataFileSetReader.Range().Start + if !currentStart.After(previousStart) { + return nil, fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", previousStart, currentStart) + } + previousStart = currentStart + } + + return &crossBlockReader{dataFileSetReaders: dataFileSetReaders}, nil +} + +func (r *crossBlockReader) Read() (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) { + if !r.initialized { + r.initialized = true + err := r.init() + if err != nil { + return nil, nil, nil, 0, err + } + } + + if len(r.minHeap) == 0 { + return nil, nil, nil, 0, io.EOF + } + + entry := heap.Pop(&r.minHeap).(*minHeapEntry) + if r.dataFileSetReaders[entry.dataFileSetReaderIndex] != nil { + nextEntry, err := r.readFromDataFileSet(entry.dataFileSetReaderIndex) + if err == io.EOF { + // will no longer read from this one + r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil + } else if err != nil { + return nil, nil, nil, 0, err + } else { + heap.Push(&r.minHeap, nextEntry) + } + } + + return entry.id, entry.tags, entry.data, entry.checksum, nil +} + +func (r *crossBlockReader) init() error { + r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders)) + + for i := range r.dataFileSetReaders { + entry, err := r.readFromDataFileSet(i) + if err == io.EOF { + continue + } + if err != nil { + return err + } + r.minHeap = append(r.minHeap, entry) + } + + heap.Init(&r.minHeap) + + return nil +} + +func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error) { + id, tags, data, checksum, err := r.dataFileSetReaders[index].Read() + + if err == io.EOF { + return nil, err + } + + if err != nil { + closeErr := r.Close() + if closeErr != nil { + return nil, xerrors.NewMultiError().Add(err).Add(closeErr) + } + return nil, err + } + + return &minHeapEntry{ + dataFileSetReaderIndex: index, + id: id, + tags: tags, + data: data, + checksum: checksum, + }, nil +} + +func (r *crossBlockReader) Close() error { + for _, entry := range r.minHeap { + entry.id.Finalize() + entry.tags.Close() + } + return nil +} + +func (h minHeap) Len() int { + return len(h) +} + +func (h minHeap) Less(i, j int) bool { + idsCmp := bytes.Compare(h[i].id.Bytes(), h[j].id.Bytes()) + if idsCmp == 0 { + return h[i].dataFileSetReaderIndex < h[j].dataFileSetReaderIndex + } + return idsCmp < 0 +} + +func (h minHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *minHeap) Push(x interface{}) { + *h = append(*h, x.(*minHeapEntry)) +} + +func (h *minHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index b8fc7866d0..0394711684 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -586,3 +586,15 @@ type Segments interface { AbsoluteFilePaths() []string BlockStart() time.Time } + +// CrossBlockReader allows reading data (encoded bytes) from multiple DataFileSetReaders of the same shard, +// ordered by series id first, and block start next. +type CrossBlockReader interface { + io.Closer + + // Read returns the next id, data, checksum tuple or error, will return io.EOF after all DataFileSetReaders exhausted. + // Note: make sure to finalize the ID, close the Tags and finalize the Data when done with + // them so they can be returned to their respective pools. + Read() (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) + +} From 6499aca2dc33c0384fa2ededf9afa4ef5531ba48 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Wed, 22 Jul 2020 16:58:17 +0300 Subject: [PATCH 03/23] Assert on OrderedByIndex --- src/dbnode/persist/fs/cross_block_reader.go | 19 ++++++++++++++----- src/dbnode/persist/fs/read.go | 4 ++++ src/dbnode/persist/fs/types.go | 5 ++++- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 7503ed9006..10f111457d 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -3,15 +3,21 @@ package fs import ( "bytes" "container/heap" + "errors" "fmt" + "io" + "time" + "github.com/m3db/m3/src/x/checked" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" - "io" - "time" ) -var _ CrossBlockReader = (*crossBlockReader)(nil) +var ( + errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index") + _ CrossBlockReader = (*crossBlockReader)(nil) + _ heap.Interface = (*minHeap)(nil) +) type crossBlockReader struct { dataFileSetReaders []DataFileSetReader @@ -28,16 +34,19 @@ type minHeapEntry struct { checksum uint32 } -var _ heap.Interface = (*minHeap)(nil) - type minHeap []*minHeapEntry // NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders. +// DataFileSetReaders must be configured to return the data in the order of index, and must be +// provided in a slice sorted by block start time. // Callers are responsible for closing the DataFileSetReaders. func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (*crossBlockReader, error) { var previousStart time.Time for _, dataFileSetReader := range dataFileSetReaders { currentStart := dataFileSetReader.Range().Start + if !dataFileSetReader.IsOrderedByIndex() { + return nil, errReaderNotOrderedByIndex + } if !currentStart.After(previousStart) { return nil, fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", previousStart, currentStart) } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 890fc7ea88..d198a10414 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -570,6 +570,10 @@ func (r *reader) MetadataRead() int { return r.metadataRead } +func (r *reader) IsOrderedByIndex() bool { + return r.orderedByIndex +} + func (r *reader) Close() error { // Close and prepare resources that are to be reused multiErr := xerrors.NewMultiError() diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 0394711684..167940b381 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -172,6 +172,9 @@ type DataFileSetReader interface { // MetadataRead returns the position of metadata read into the volume MetadataRead() int + + // IsOrderedByIndex returns true if the reader reads the data in the order of index. + IsOrderedByIndex() bool } // DataFileSetSeeker provides an out of order reader for a TSDB file set @@ -588,7 +591,7 @@ type Segments interface { } // CrossBlockReader allows reading data (encoded bytes) from multiple DataFileSetReaders of the same shard, -// ordered by series id first, and block start next. +// ordered by series id first, and block start time next. type CrossBlockReader interface { io.Closer From 92d77587b268284c53d522776ffa4865a0d81552 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Thu, 23 Jul 2020 09:56:08 +0300 Subject: [PATCH 04/23] Tests --- src/dbnode/persist/fs/cross_block_reader.go | 2 +- .../persist/fs/cross_block_reader_test.go | 134 ++++++++++++++++++ src/dbnode/persist/fs/fs_mock.go | 14 ++ 3 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/dbnode/persist/fs/cross_block_reader_test.go diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 10f111457d..955018b42d 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -43,10 +43,10 @@ type minHeap []*minHeapEntry func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (*crossBlockReader, error) { var previousStart time.Time for _, dataFileSetReader := range dataFileSetReaders { - currentStart := dataFileSetReader.Range().Start if !dataFileSetReader.IsOrderedByIndex() { return nil, errReaderNotOrderedByIndex } + currentStart := dataFileSetReader.Range().Start if !currentStart.After(previousStart) { return nil, fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", previousStart, currentStart) } diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go new file mode 100644 index 0000000000..4aec7130bb --- /dev/null +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -0,0 +1,134 @@ +package fs + +import ( + "fmt" + "io" + "testing" + "time" + + "github.com/m3db/m3/src/x/ident" + xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + dfsReader := NewMockDataFileSetReader(ctrl) + dfsReader.EXPECT().IsOrderedByIndex().Return(false) + + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader}) + + assert.Equal(t, errReaderNotOrderedByIndex, err) +} + +func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + now := time.Now().Truncate(time.Hour) + dfsReader1 := NewMockDataFileSetReader(ctrl) + dfsReader1.EXPECT().IsOrderedByIndex().Return(true) + dfsReader1.EXPECT().Range().Return(xtime.Range{Start: now}) + + later := now.Add(time.Hour) + dfsReader2 := NewMockDataFileSetReader(ctrl) + dfsReader2.EXPECT().IsOrderedByIndex().Return(true) + dfsReader2.EXPECT().Range().Return(xtime.Range{Start: later}) + + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1}) + + expectedErr := fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", later, now) + + assert.Equal(t, expectedErr, err) +} + +func TestCrossBlockReader(t *testing.T) { + tests := []struct { + name string + blockSeriesIds [][]string + }{ + {"no readers", [][]string{}}, + {"empty readers", [][]string{{}, {}, {}}}, + {"one reader, one series", [][]string{{"id1"}}}, + {"one reader, many series", [][]string{{"id1", "id2", "id3"}}}, + {"many readers with same series", [][]string{{"id1"}, {"id1"}, {"id1"}}}, + {"many readers with different series", [][]string{{"id1"}, {"id2"}, {"id3"}}}, + {"many readers with unordered series", [][]string{{"id3"}, {"id1"}, {"id2"}}}, + {"complex case", [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testCrossBlockReader(t, tt.blockSeriesIds) + }) + } +} + +func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + now := time.Now().Truncate(time.Hour) + var dfsReaders []DataFileSetReader + expectedCount := 0 + + for blockIndex, ids := range blockSeriesIds { + dfsReader := NewMockDataFileSetReader(ctrl) + dfsReader.EXPECT().IsOrderedByIndex().Return(true) + dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}) + + for j, id := range ids { + tags := ident.NewTags(ident.StringTag("foo", string(j))) + data := checkedBytes([]byte{byte(j)}) + checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions + dfsReader.EXPECT().Read().Return(ident.StringID(id), ident.NewTagsIterator(tags), data, checksum, nil) + } + + dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF) + + dfsReaders = append(dfsReaders, dfsReader) + expectedCount += len(ids) + } + + cbReader, err := NewCrossBlockReader(dfsReaders) + require.NoError(t, err) + defer cbReader.Close() + + actualCount := 0 + previousId := "" + var previousBlockIndex uint32 + for { + id, tags, data, checksum, err := cbReader.Read() + if err == io.EOF { + break + } + require.NoError(t, err) + + strId := id.String() + id.Finalize() + blockIndex := checksum // see the comment above + assert.True(t, strId >= previousId, "series must be read in non-decreasing id order") + if strId == previousId { + assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order") + } + + assert.NotNil(t, tags) + tags.Close() + + assert.NotNil(t, data) + data.DecRef() + data.Finalize() + + previousId = strId + previousBlockIndex = blockIndex + + actualCount++ + } + + assert.Equal(t, expectedCount, actualCount, "count of series read") +} diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 084b8e2afc..e4ae7c993a 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -201,6 +201,20 @@ func (mr *MockDataFileSetReaderMockRecorder) EntriesRead() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EntriesRead", reflect.TypeOf((*MockDataFileSetReader)(nil).EntriesRead)) } +// IsOrderedByIndex mocks base method +func (m *MockDataFileSetReader) IsOrderedByIndex() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsOrderedByIndex") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsOrderedByIndex indicates an expected call of IsOrderedByIndex +func (mr *MockDataFileSetReaderMockRecorder) IsOrderedByIndex() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsOrderedByIndex", reflect.TypeOf((*MockDataFileSetReader)(nil).IsOrderedByIndex)) +} + // MetadataRead mocks base method func (m *MockDataFileSetReader) MetadataRead() int { m.ctrl.T.Helper() From 1211434745c0d561b93413c9de5da49ad642518f Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Thu, 23 Jul 2020 09:58:22 +0300 Subject: [PATCH 05/23] Mocks --- src/dbnode/generated/mocks/generate.go | 2 +- src/dbnode/persist/fs/fs_mock.go | 57 +++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index 0dc98bc96b..5f0a7bff07 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -20,7 +20,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode) -//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" +//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index e4ae7c993a..722d39ab0a 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith) +// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader) // Copyright (c) 2020 Uber Technologies, Inc. // @@ -1276,3 +1276,58 @@ func (mr *MockMergeWithMockRecorder) Read(arg0, arg1, arg2, arg3 interface{}) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockMergeWith)(nil).Read), arg0, arg1, arg2, arg3) } + +// MockCrossBlockReader is a mock of CrossBlockReader interface +type MockCrossBlockReader struct { + ctrl *gomock.Controller + recorder *MockCrossBlockReaderMockRecorder +} + +// MockCrossBlockReaderMockRecorder is the mock recorder for MockCrossBlockReader +type MockCrossBlockReaderMockRecorder struct { + mock *MockCrossBlockReader +} + +// NewMockCrossBlockReader creates a new mock instance +func NewMockCrossBlockReader(ctrl *gomock.Controller) *MockCrossBlockReader { + mock := &MockCrossBlockReader{ctrl: ctrl} + mock.recorder = &MockCrossBlockReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCrossBlockReader) EXPECT() *MockCrossBlockReaderMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockCrossBlockReader) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockCrossBlockReaderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCrossBlockReader)(nil).Close)) +} + +// Read mocks base method +func (m *MockCrossBlockReader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Read") + ret0, _ := ret[0].(ident.ID) + ret1, _ := ret[1].(ident.TagIterator) + ret2, _ := ret[2].(checked.Bytes) + ret3, _ := ret[3].(uint32) + ret4, _ := ret[4].(error) + return ret0, ret1, ret2, ret3, ret4 +} + +// Read indicates an expected call of Read +func (mr *MockCrossBlockReaderMockRecorder) Read() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockCrossBlockReader)(nil).Read)) +} From f3dead9ddb74606b254ba9c77bacf79b09f17244 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Thu, 23 Jul 2020 10:37:56 +0300 Subject: [PATCH 06/23] Dont test just the happy path --- src/dbnode/persist/fs/cross_block_reader.go | 3 +++ .../persist/fs/cross_block_reader_test.go | 25 ++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 955018b42d..05008660bc 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -132,7 +132,10 @@ func (r *crossBlockReader) Close() error { for _, entry := range r.minHeap { entry.id.Finalize() entry.tags.Close() + entry.data.DecRef() + entry.data.Finalize() } + r.minHeap = r.minHeap[:0] return nil } diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 4aec7130bb..82c3a2540f 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -1,6 +1,7 @@ package fs import ( + "errors" "fmt" "io" "testing" @@ -14,6 +15,8 @@ import ( "github.com/stretchr/testify/require" ) +var expectedError = errors.New("expected error") + func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -60,6 +63,7 @@ func TestCrossBlockReader(t *testing.T) { {"many readers with different series", [][]string{{"id1"}, {"id2"}, {"id3"}}}, {"many readers with unordered series", [][]string{{"id3"}, {"id1"}, {"id2"}}}, {"complex case", [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}}, + {"reader error", [][]string{{"id1", "id2"}, {"id1", "error"}}}, } for _, tt := range tests { @@ -82,14 +86,22 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { dfsReader.EXPECT().IsOrderedByIndex().Return(true) dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}) + blockHasError := false for j, id := range ids { tags := ident.NewTags(ident.StringTag("foo", string(j))) data := checkedBytes([]byte{byte(j)}) checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions - dfsReader.EXPECT().Read().Return(ident.StringID(id), ident.NewTagsIterator(tags), data, checksum, nil) + if id == "error" { + dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), expectedError) + blockHasError = true + } else { + dfsReader.EXPECT().Read().Return(ident.StringID(id), ident.NewTagsIterator(tags), data, checksum, nil) + } } - dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF) + if !blockHasError { + dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF).MaxTimes(1) + } dfsReaders = append(dfsReaders, dfsReader) expectedCount += len(ids) @@ -99,6 +111,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { require.NoError(t, err) defer cbReader.Close() + hadError := false actualCount := 0 previousId := "" var previousBlockIndex uint32 @@ -107,6 +120,10 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { if err == io.EOF { break } + if err == expectedError { + hadError = true + break + } require.NoError(t, err) strId := id.String() @@ -130,5 +147,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { actualCount++ } - assert.Equal(t, expectedCount, actualCount, "count of series read") + if !hadError { + assert.Equal(t, expectedCount, actualCount, "count of series read") + } } From 3bddd14a2f6bff590091997a4417158479038c87 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Sat, 25 Jul 2020 21:54:32 +0300 Subject: [PATCH 07/23] Addressed review feedback --- src/dbnode/persist/fs/cross_block_reader.go | 32 +++++++++---------- .../persist/fs/cross_block_reader_test.go | 2 +- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 05008660bc..ce28e610a0 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -15,7 +15,6 @@ import ( var ( errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index") - _ CrossBlockReader = (*crossBlockReader)(nil) _ heap.Interface = (*minHeap)(nil) ) @@ -26,21 +25,11 @@ type crossBlockReader struct { err error } -type minHeapEntry struct { - dataFileSetReaderIndex int - id ident.ID - tags ident.TagIterator - data checked.Bytes - checksum uint32 -} - -type minHeap []*minHeapEntry - // NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders. // DataFileSetReaders must be configured to return the data in the order of index, and must be // provided in a slice sorted by block start time. // Callers are responsible for closing the DataFileSetReaders. -func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (*crossBlockReader, error) { +func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockReader, error) { var previousStart time.Time for _, dataFileSetReader := range dataFileSetReaders { if !dataFileSetReader.IsOrderedByIndex() { @@ -112,11 +101,10 @@ func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error) } if err != nil { - closeErr := r.Close() - if closeErr != nil { - return nil, xerrors.NewMultiError().Add(err).Add(closeErr) - } - return nil, err + multiErr := xerrors.NewMultiError(). + Add(err). + Add(r.Close()) + return nil, multiErr.FinalError() } return &minHeapEntry{ @@ -139,6 +127,16 @@ func (r *crossBlockReader) Close() error { return nil } +type minHeapEntry struct { + dataFileSetReaderIndex int + id ident.ID + tags ident.TagIterator + data checked.Bytes + checksum uint32 +} + +type minHeap []*minHeapEntry + func (h minHeap) Len() int { return len(h) } diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 82c3a2540f..e11e5cc345 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -120,7 +120,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { if err == io.EOF { break } - if err == expectedError { + if err != nil && err.Error() == expectedError.Error() { hadError = true break } From 8f68dd5f07ccb3cb6727de69786738d5f5aecbb3 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Sat, 25 Jul 2020 21:55:30 +0300 Subject: [PATCH 08/23] Legal stuff --- src/dbnode/persist/fs/cross_block_reader.go | 20 +++++++++++++++++++ .../persist/fs/cross_block_reader_test.go | 20 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index ce28e610a0..dfa5b61320 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -1,3 +1,23 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package fs import ( diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index e11e5cc345..636fff5d9b 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package fs import ( From a456c95cd6a00118ece190c541efa3eecdb3f00e Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 4 Aug 2020 16:08:21 +0300 Subject: [PATCH 09/23] Group Read() results by id --- src/dbnode/persist/fs/cross_block_reader.go | 45 ++++++++++++++++--- .../persist/fs/cross_block_reader_test.go | 29 ++++++------ src/dbnode/persist/fs/types.go | 5 ++- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index dfa5b61320..ce2d49b429 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -40,6 +40,7 @@ var ( type crossBlockReader struct { dataFileSetReaders []DataFileSetReader + activeReadersCount int initialized bool minHeap minHeap err error @@ -62,20 +63,51 @@ func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockRead previousStart = currentStart } - return &crossBlockReader{dataFileSetReaders: dataFileSetReaders}, nil + return &crossBlockReader{dataFileSetReaders: dataFileSetReaders, activeReadersCount: len(dataFileSetReaders)}, nil } -func (r *crossBlockReader) Read() (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) { +func (r *crossBlockReader) Read() (id ident.ID, tags ident.TagIterator, datas []checked.Bytes, checksums []uint32, err error) { if !r.initialized { r.initialized = true err := r.init() if err != nil { - return nil, nil, nil, 0, err + return nil, nil, nil, nil, err } } + firstEntry, err := r.readOne() + if err != nil { + return nil, nil, nil, nil, err + } + + datas = make([]checked.Bytes, 0, r.activeReadersCount) + checksums = make([]uint32, 0, r.activeReadersCount) + + datas = append(datas, firstEntry.data) + checksums = append(checksums, firstEntry.checksum) + + for len(r.minHeap) > 0 && r.minHeap[0].id.Equal(firstEntry.id) { + nextEntry, err := r.readOne() + if err != nil { + // Finalize what was already read: + for _, data := range datas { + data.DecRef() + data.Finalize() + } + return nil, nil, nil, nil, err + } + nextEntry.id.Finalize() + nextEntry.tags.Close() + datas = append(datas, nextEntry.data) + checksums = append(checksums, nextEntry.checksum) + } + + return firstEntry.id, firstEntry.tags, datas, checksums, nil +} + +func (r *crossBlockReader) readOne() (*minHeapEntry, error) { if len(r.minHeap) == 0 { - return nil, nil, nil, 0, io.EOF + return nil, io.EOF } entry := heap.Pop(&r.minHeap).(*minHeapEntry) @@ -84,14 +116,15 @@ func (r *crossBlockReader) Read() (id ident.ID, tags ident.TagIterator, data che if err == io.EOF { // will no longer read from this one r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil + r.activeReadersCount-- } else if err != nil { - return nil, nil, nil, 0, err + return nil, err } else { heap.Push(&r.minHeap, nextEntry) } } - return entry.id, entry.tags, entry.data, entry.checksum, nil + return entry, nil } func (r *crossBlockReader) init() error { diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 636fff5d9b..998d694594 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -134,9 +134,8 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { hadError := false actualCount := 0 previousId := "" - var previousBlockIndex uint32 for { - id, tags, data, checksum, err := cbReader.Read() + id, tags, datas, checksums, err := cbReader.Read() if err == io.EOF { break } @@ -148,23 +147,27 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { strId := id.String() id.Finalize() - blockIndex := checksum // see the comment above - assert.True(t, strId >= previousId, "series must be read in non-decreasing id order") - if strId == previousId { - assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order") - } + assert.True(t, strId > previousId, "series must be read in increasing id order") assert.NotNil(t, tags) tags.Close() - assert.NotNil(t, data) - data.DecRef() - data.Finalize() + assert.Equal(t, len(datas), len(checksums)) - previousId = strId - previousBlockIndex = blockIndex + var previousBlockIndex uint32 + for _, blockIndex := range checksums { // see the comment above + assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order") + previousBlockIndex = blockIndex + } + + for _, data := range datas { + assert.NotNil(t, data) + data.DecRef() + data.Finalize() + } - actualCount++ + previousId = strId + actualCount += len(datas) } if !hadError { diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 167940b381..f44e5be34f 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -595,9 +595,10 @@ type Segments interface { type CrossBlockReader interface { io.Closer - // Read returns the next id, data, checksum tuple or error, will return io.EOF after all DataFileSetReaders exhausted. + // Read returns the next distinct id and tags, plus slices with data and checksums from all blocks corresponding to + // the id returned. Returns io.EOF after all DataFileSetReaders exhausted. // Note: make sure to finalize the ID, close the Tags and finalize the Data when done with // them so they can be returned to their respective pools. - Read() (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) + Read() (id ident.ID, tags ident.TagIterator, datas []checked.Bytes, checksums []uint32, err error) } From cb6073091f0c5e826cdee3e30eed73720e184024 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 4 Aug 2020 17:08:12 +0300 Subject: [PATCH 10/23] Remodel CrossBlockReader as an Iterator --- src/dbnode/persist/fs/cross_block_reader.go | 67 +++++++++++++------ .../persist/fs/cross_block_reader_test.go | 36 ++++------ src/dbnode/persist/fs/types.go | 16 ++++- 3 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index ce2d49b429..23b1080e60 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -34,13 +34,16 @@ import ( ) var ( - errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index") - _ heap.Interface = (*minHeap)(nil) + errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index") + errEmptyReader = errors.New("trying to read from empty reader") + _ heap.Interface = (*minHeap)(nil) ) type crossBlockReader struct { dataFileSetReaders []DataFileSetReader - activeReadersCount int + id ident.ID + tags ident.TagIterator + records []BlockRecord initialized bool minHeap minHeap err error @@ -63,51 +66,66 @@ func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockRead previousStart = currentStart } - return &crossBlockReader{dataFileSetReaders: dataFileSetReaders, activeReadersCount: len(dataFileSetReaders)}, nil + return &crossBlockReader{ + dataFileSetReaders: dataFileSetReaders, + records: make([]BlockRecord, 0, len(dataFileSetReaders)), + }, nil } -func (r *crossBlockReader) Read() (id ident.ID, tags ident.TagIterator, datas []checked.Bytes, checksums []uint32, err error) { +func (r *crossBlockReader) Next() bool { if !r.initialized { - r.initialized = true err := r.init() if err != nil { - return nil, nil, nil, nil, err + r.err = err + return false } } + if len(r.minHeap) == 0 { + return false + } + firstEntry, err := r.readOne() if err != nil { - return nil, nil, nil, nil, err + r.err = err + return false } - datas = make([]checked.Bytes, 0, r.activeReadersCount) - checksums = make([]uint32, 0, r.activeReadersCount) + r.id = firstEntry.id + r.tags = firstEntry.tags - datas = append(datas, firstEntry.data) - checksums = append(checksums, firstEntry.checksum) + r.records = r.records[:0] + r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum}) for len(r.minHeap) > 0 && r.minHeap[0].id.Equal(firstEntry.id) { nextEntry, err := r.readOne() if err != nil { - // Finalize what was already read: - for _, data := range datas { - data.DecRef() - data.Finalize() + // Close the resources that were already read but not returned to the consumer: + r.id.Finalize() + r.tags.Close() + for _, record := range r.records { + record.Data.DecRef() + record.Data.Finalize() } - return nil, nil, nil, nil, err + r.records = r.records[:0] + r.err = err + return false } nextEntry.id.Finalize() nextEntry.tags.Close() - datas = append(datas, nextEntry.data) - checksums = append(checksums, nextEntry.checksum) + r.records = append(r.records, BlockRecord{nextEntry.data, nextEntry.checksum}) } - return firstEntry.id, firstEntry.tags, datas, checksums, nil + return true +} + +func (r *crossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) { + return r.id, r.tags, r.records } func (r *crossBlockReader) readOne() (*minHeapEntry, error) { if len(r.minHeap) == 0 { - return nil, io.EOF + return nil, errEmptyReader } entry := heap.Pop(&r.minHeap).(*minHeapEntry) @@ -116,7 +134,6 @@ func (r *crossBlockReader) readOne() (*minHeapEntry, error) { if err == io.EOF { // will no longer read from this one r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil - r.activeReadersCount-- } else if err != nil { return nil, err } else { @@ -128,6 +145,7 @@ func (r *crossBlockReader) readOne() (*minHeapEntry, error) { } func (r *crossBlockReader) init() error { + r.initialized = true r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders)) for i := range r.dataFileSetReaders { @@ -169,7 +187,12 @@ func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error) }, nil } +func (r *crossBlockReader) Err() error { + return r.err +} + func (r *crossBlockReader) Close() error { + // Close the resources that were buffered in minHeap: for _, entry := range r.minHeap { entry.id.Finalize() entry.tags.Close() diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 998d694594..79350b3b1e 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -83,7 +83,8 @@ func TestCrossBlockReader(t *testing.T) { {"many readers with different series", [][]string{{"id1"}, {"id2"}, {"id3"}}}, {"many readers with unordered series", [][]string{{"id3"}, {"id1"}, {"id2"}}}, {"complex case", [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}}, - {"reader error", [][]string{{"id1", "id2"}, {"id1", "error"}}}, + {"immediate reader error", [][]string{{"error"}}}, + {"reader error later", [][]string{{"id1", "id2"}, {"id1", "error"}}}, } for _, tt := range tests { @@ -131,19 +132,10 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { require.NoError(t, err) defer cbReader.Close() - hadError := false actualCount := 0 previousId := "" - for { - id, tags, datas, checksums, err := cbReader.Read() - if err == io.EOF { - break - } - if err != nil && err.Error() == expectedError.Error() { - hadError = true - break - } - require.NoError(t, err) + for cbReader.Next() { + id, tags, records := cbReader.Current() strId := id.String() id.Finalize() @@ -152,25 +144,23 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { assert.NotNil(t, tags) tags.Close() - assert.Equal(t, len(datas), len(checksums)) - var previousBlockIndex uint32 - for _, blockIndex := range checksums { // see the comment above + for _, record := range records { + blockIndex := record.Checksum // see the comment above assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order") previousBlockIndex = blockIndex - } - - for _, data := range datas { - assert.NotNil(t, data) - data.DecRef() - data.Finalize() + assert.NotNil(t, record.Data) + record.Data.DecRef() + record.Data.Finalize() } previousId = strId - actualCount += len(datas) + actualCount += len(records) } - if !hadError { + err = cbReader.Err() + if err == nil || err.Error() != expectedError.Error() { + require.NoError(t, cbReader.Err()) assert.Equal(t, expectedCount, actualCount, "count of series read") } } diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index f44e5be34f..44f2d988bc 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -590,15 +590,25 @@ type Segments interface { BlockStart() time.Time } +// BlockRecord wraps together M3TSZ data bytes with their checksum. +type BlockRecord struct { + Data checked.Bytes + Checksum uint32 +} + // CrossBlockReader allows reading data (encoded bytes) from multiple DataFileSetReaders of the same shard, // ordered by series id first, and block start time next. type CrossBlockReader interface { io.Closer - // Read returns the next distinct id and tags, plus slices with data and checksums from all blocks corresponding to - // the id returned. Returns io.EOF after all DataFileSetReaders exhausted. + Next() bool + + Err() error + + // Current returns distinct series id and tags, plus a slice with data and checksums from all blocks corresponding + // to that series (in temporal order). // Note: make sure to finalize the ID, close the Tags and finalize the Data when done with // them so they can be returned to their respective pools. - Read() (id ident.ID, tags ident.TagIterator, datas []checked.Bytes, checksums []uint32, err error) + Current() (ident.ID, ident.TagIterator, []BlockRecord) } From 4a2749d702cb00d7276f0af3bbc8c00b1358a477 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 4 Aug 2020 17:09:59 +0300 Subject: [PATCH 11/23] Mockgen --- src/dbnode/persist/fs/fs_mock.go | 46 +++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 722d39ab0a..f3ec69934d 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1314,20 +1314,46 @@ func (mr *MockCrossBlockReaderMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCrossBlockReader)(nil).Close)) } -// Read mocks base method -func (m *MockCrossBlockReader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { +// Current mocks base method +func (m *MockCrossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Read") + ret := m.ctrl.Call(m, "Current") ret0, _ := ret[0].(ident.ID) ret1, _ := ret[1].(ident.TagIterator) - ret2, _ := ret[2].(checked.Bytes) - ret3, _ := ret[3].(uint32) - ret4, _ := ret[4].(error) - return ret0, ret1, ret2, ret3, ret4 + ret2, _ := ret[2].([]BlockRecord) + return ret0, ret1, ret2 } -// Read indicates an expected call of Read -func (mr *MockCrossBlockReaderMockRecorder) Read() *gomock.Call { +// Current indicates an expected call of Current +func (mr *MockCrossBlockReaderMockRecorder) Current() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockCrossBlockReader)(nil).Current)) +} + +// Err mocks base method +func (m *MockCrossBlockReader) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockCrossBlockReaderMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockCrossBlockReader)(nil).Err)) +} + +// Next mocks base method +func (m *MockCrossBlockReader) Next() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockCrossBlockReaderMockRecorder) Next() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockCrossBlockReader)(nil).Read)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockCrossBlockReader)(nil).Next)) } From 83845d21ddc16594aacb5e968b3a94591365166c Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Wed, 5 Aug 2020 09:39:57 +0300 Subject: [PATCH 12/23] Erase slice contents before draining them --- src/dbnode/persist/fs/cross_block_reader.go | 14 +++++++++++++- src/dbnode/persist/fs/types.go | 3 ++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 23b1080e60..e196be31a8 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -94,6 +94,12 @@ func (r *crossBlockReader) Next() bool { r.id = firstEntry.id r.tags = firstEntry.tags + // use empty var in inner loop with "for i := range" to have compiler use memclr optimization + // see: https://codereview.appspot.com/137880043 + var emptyRecord BlockRecord + for i := range r.records { + r.records[i] = emptyRecord + } r.records = r.records[:0] r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum}) @@ -107,6 +113,9 @@ func (r *crossBlockReader) Next() bool { record.Data.DecRef() record.Data.Finalize() } + for i := range r.records { + r.records[i] = emptyRecord + } r.records = r.records[:0] r.err = err return false @@ -193,12 +202,14 @@ func (r *crossBlockReader) Err() error { func (r *crossBlockReader) Close() error { // Close the resources that were buffered in minHeap: - for _, entry := range r.minHeap { + for i, entry := range r.minHeap { entry.id.Finalize() entry.tags.Close() entry.data.DecRef() entry.data.Finalize() + r.minHeap[i] = nil } + r.minHeap = r.minHeap[:0] return nil } @@ -237,6 +248,7 @@ func (h *minHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] + old[n-1] = nil *h = old[0 : n-1] return x } diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 44f2d988bc..21aabf53aa 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -608,7 +608,8 @@ type CrossBlockReader interface { // Current returns distinct series id and tags, plus a slice with data and checksums from all blocks corresponding // to that series (in temporal order). // Note: make sure to finalize the ID, close the Tags and finalize the Data when done with - // them so they can be returned to their respective pools. + // them so they can be returned to their respective pools. Also, []BlockRecord slice is being invalidated + // on each call to Next(). Current() (ident.ID, ident.TagIterator, []BlockRecord) } From 367551a96598841c62d9091c1c0e23884ad2e8e2 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Wed, 5 Aug 2020 11:29:49 +0300 Subject: [PATCH 13/23] Align with master --- src/dbnode/persist/fs/read.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 3957a7660a..7274ca07b2 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -389,7 +389,7 @@ func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Byte // NB(r): _must_ check the checksum against known checksum as the data // file might not have been verified if we haven't read through the file yet. - if entry.Checksum != int64(digest.Checksum(data.Bytes())) { + if entry.DataChecksum != int64(digest.Checksum(data.Bytes())) { return nil, nil, nil, 0, errSeekChecksumMismatch } @@ -398,7 +398,7 @@ func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Byte r.entriesRead++ - return id, tags, data, uint32(entry.Checksum), nil + return id, tags, data, uint32(entry.DataChecksum), nil } func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { @@ -463,7 +463,7 @@ func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, id := r.entryClonedID(entry.ID) tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) length := int(entry.Size) - checksum := uint32(entry.Checksum) + checksum := uint32(entry.DataChecksum) r.metadataRead++ return id, tags, length, checksum, nil From 3f3b4933a950c2eb86fe015d6b5e520fd6cee1f7 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Wed, 5 Aug 2020 13:17:13 +0300 Subject: [PATCH 14/23] Make a defensive copy of dataFileSetReaders --- src/dbnode/persist/fs/cross_block_reader.go | 2 +- src/dbnode/persist/fs/cross_block_reader_test.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index e196be31a8..5218cd3077 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -67,7 +67,7 @@ func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockRead } return &crossBlockReader{ - dataFileSetReaders: dataFileSetReaders, + dataFileSetReaders: append([]DataFileSetReader{}, dataFileSetReaders...), records: make([]BlockRecord, 0, len(dataFileSetReaders)), }, nil } diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 79350b3b1e..541fbe74cf 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -163,4 +163,8 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { require.NoError(t, cbReader.Err()) assert.Equal(t, expectedCount, actualCount, "count of series read") } + + for _, dfsReader := range dfsReaders { + assert.NotNil(t, dfsReader) + } } From 6a1912f01e1cee3de9af8f5e1e20327bbba62d05 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 10 Aug 2020 15:37:52 +0300 Subject: [PATCH 15/23] Fuse else / if --- src/dbnode/persist/fs/read.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 7274ca07b2..7c55277ad9 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -271,11 +271,9 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { } if opts.OrderedByIndex { r.decoder.Reset(r.indexDecoderStream) - } else { - if err := r.readIndexAndSortByOffsetAsc(); err != nil { - r.Close() - return err - } + } else if err := r.readIndexAndSortByOffsetAsc(); err != nil { + r.Close() + return err } r.open = true From 8081417be34d836d685945d0593f39b19c8f8b5f Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 10 Aug 2020 16:12:28 +0300 Subject: [PATCH 16/23] Address feedback --- src/dbnode/persist/fs/cross_block_reader.go | 5 ++++- src/dbnode/persist/fs/cross_block_reader_test.go | 6 +++--- src/dbnode/persist/fs/fs_mock.go | 6 +++--- src/dbnode/persist/fs/read.go | 2 +- src/dbnode/persist/fs/types.go | 13 +++++++------ 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 5218cd3077..93045157ec 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -56,7 +56,7 @@ type crossBlockReader struct { func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockReader, error) { var previousStart time.Time for _, dataFileSetReader := range dataFileSetReaders { - if !dataFileSetReader.IsOrderedByIndex() { + if !dataFileSetReader.OrderedByIndex() { return nil, errReaderNotOrderedByIndex } currentStart := dataFileSetReader.Range().Start @@ -120,8 +120,11 @@ func (r *crossBlockReader) Next() bool { r.err = err return false } + + // id and tags not needed for subsequent blocs because they are the same as in the first block nextEntry.id.Finalize() nextEntry.tags.Close() + r.records = append(r.records, BlockRecord{nextEntry.data, nextEntry.checksum}) } diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 541fbe74cf..3cc13e6f62 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -73,7 +73,7 @@ func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) { func TestCrossBlockReader(t *testing.T) { tests := []struct { name string - blockSeriesIds [][]string + blockSeriesIDs [][]string }{ {"no readers", [][]string{}}, {"empty readers", [][]string{{}, {}, {}}}, @@ -89,7 +89,7 @@ func TestCrossBlockReader(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - testCrossBlockReader(t, tt.blockSeriesIds) + testCrossBlockReader(t, tt.blockSeriesIDs) }) } } @@ -146,7 +146,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { var previousBlockIndex uint32 for _, record := range records { - blockIndex := record.Checksum // see the comment above + blockIndex := record.DataChecksum // see the comment above assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order") previousBlockIndex = blockIndex assert.NotNil(t, record.Data) diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index f3ec69934d..6e40c6b14e 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -202,9 +202,9 @@ func (mr *MockDataFileSetReaderMockRecorder) EntriesRead() *gomock.Call { } // IsOrderedByIndex mocks base method -func (m *MockDataFileSetReader) IsOrderedByIndex() bool { +func (m *MockDataFileSetReader) OrderedByIndex() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsOrderedByIndex") + ret := m.ctrl.Call(m, "OrderedByIndex") ret0, _ := ret[0].(bool) return ret0 } @@ -212,7 +212,7 @@ func (m *MockDataFileSetReader) IsOrderedByIndex() bool { // IsOrderedByIndex indicates an expected call of IsOrderedByIndex func (mr *MockDataFileSetReaderMockRecorder) IsOrderedByIndex() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsOrderedByIndex", reflect.TypeOf((*MockDataFileSetReader)(nil).IsOrderedByIndex)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OrderedByIndex", reflect.TypeOf((*MockDataFileSetReader)(nil).OrderedByIndex)) } // MetadataRead mocks base method diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index a3e2e02e95..bae654234c 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -568,7 +568,7 @@ func (r *reader) MetadataRead() int { return r.metadataRead } -func (r *reader) IsOrderedByIndex() bool { +func (r *reader) OrderedByIndex() bool { return r.orderedByIndex } diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 9ffb9418d7..dadd5e1e5f 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -173,8 +173,8 @@ type DataFileSetReader interface { // MetadataRead returns the position of metadata read into the volume MetadataRead() int - // IsOrderedByIndex returns true if the reader reads the data in the order of index. - IsOrderedByIndex() bool + // OrderedByIndex returns true if the reader reads the data in the order of index. + OrderedByIndex() bool } // DataFileSetSeeker provides an out of order reader for a TSDB file set @@ -607,7 +607,7 @@ type Segments interface { // BlockRecord wraps together M3TSZ data bytes with their checksum. type BlockRecord struct { Data checked.Bytes - Checksum uint32 + DataChecksum uint32 } // CrossBlockReader allows reading data (encoded bytes) from multiple DataFileSetReaders of the same shard, @@ -615,15 +615,16 @@ type BlockRecord struct { type CrossBlockReader interface { io.Closer + // Next advances to the next data record and returns true, or returns false if no more data exists. Next() bool + // Err returns the last error encountered (if any). Err() error // Current returns distinct series id and tags, plus a slice with data and checksums from all blocks corresponding // to that series (in temporal order). // Note: make sure to finalize the ID, close the Tags and finalize the Data when done with - // them so they can be returned to their respective pools. Also, []BlockRecord slice is being invalidated - // on each call to Next(). + // them so they can be returned to their respective pools. Also, []BlockRecord slice and underlying data + // is being invalidated on each call to Next(). Current() (ident.ID, ident.TagIterator, []BlockRecord) - } From dd207d14aeda93f618e0f72db31287c414ff6c8e Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 10 Aug 2020 16:47:00 +0300 Subject: [PATCH 17/23] Mockgen --- src/dbnode/persist/fs/fs_mock.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 6e40c6b14e..583e801116 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -201,20 +201,6 @@ func (mr *MockDataFileSetReaderMockRecorder) EntriesRead() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EntriesRead", reflect.TypeOf((*MockDataFileSetReader)(nil).EntriesRead)) } -// IsOrderedByIndex mocks base method -func (m *MockDataFileSetReader) OrderedByIndex() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "OrderedByIndex") - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsOrderedByIndex indicates an expected call of IsOrderedByIndex -func (mr *MockDataFileSetReaderMockRecorder) IsOrderedByIndex() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OrderedByIndex", reflect.TypeOf((*MockDataFileSetReader)(nil).OrderedByIndex)) -} - // MetadataRead mocks base method func (m *MockDataFileSetReader) MetadataRead() int { m.ctrl.T.Helper() @@ -243,6 +229,20 @@ func (mr *MockDataFileSetReaderMockRecorder) Open(arg0 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockDataFileSetReader)(nil).Open), arg0) } +// OrderedByIndex mocks base method +func (m *MockDataFileSetReader) OrderedByIndex() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OrderedByIndex") + ret0, _ := ret[0].(bool) + return ret0 +} + +// OrderedByIndex indicates an expected call of OrderedByIndex +func (mr *MockDataFileSetReaderMockRecorder) OrderedByIndex() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OrderedByIndex", reflect.TypeOf((*MockDataFileSetReader)(nil).OrderedByIndex)) +} + // Range mocks base method func (m *MockDataFileSetReader) Range() time0.Range { m.ctrl.T.Helper() From 21e4b137b8ef618ecd47e90cdee522cd299a1b80 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 10 Aug 2020 17:22:33 +0300 Subject: [PATCH 18/23] Fix test --- src/dbnode/persist/fs/cross_block_reader_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 3cc13e6f62..0bf36dd08a 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -42,7 +42,7 @@ func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) { defer ctrl.Finish() dfsReader := NewMockDataFileSetReader(ctrl) - dfsReader.EXPECT().IsOrderedByIndex().Return(false) + dfsReader.EXPECT().OrderedByIndex().Return(false) _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader}) @@ -55,12 +55,12 @@ func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) { now := time.Now().Truncate(time.Hour) dfsReader1 := NewMockDataFileSetReader(ctrl) - dfsReader1.EXPECT().IsOrderedByIndex().Return(true) + dfsReader1.EXPECT().OrderedByIndex().Return(true) dfsReader1.EXPECT().Range().Return(xtime.Range{Start: now}) later := now.Add(time.Hour) dfsReader2 := NewMockDataFileSetReader(ctrl) - dfsReader2.EXPECT().IsOrderedByIndex().Return(true) + dfsReader2.EXPECT().OrderedByIndex().Return(true) dfsReader2.EXPECT().Range().Return(xtime.Range{Start: later}) _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1}) @@ -104,7 +104,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { for blockIndex, ids := range blockSeriesIds { dfsReader := NewMockDataFileSetReader(ctrl) - dfsReader.EXPECT().IsOrderedByIndex().Return(true) + dfsReader.EXPECT().OrderedByIndex().Return(true) dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}) blockHasError := false From a766cf4548918559caa8a645ff2f5d4f76b94fde Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 10 Aug 2020 17:34:49 +0300 Subject: [PATCH 19/23] Better conversion to string --- src/dbnode/persist/fs/cross_block_reader_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 0bf36dd08a..35d2e21f45 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io" + "strconv" "testing" "time" @@ -109,7 +110,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { blockHasError := false for j, id := range ids { - tags := ident.NewTags(ident.StringTag("foo", string(j))) + tags := ident.NewTags(ident.StringTag("foo", strconv.Itoa(j))) data := checkedBytes([]byte{byte(j)}) checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions if id == "error" { From 29f3b259482494380ec6ffda485c8c86d566938e Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 11 Aug 2020 09:13:31 +0300 Subject: [PATCH 20/23] Address review feedback --- src/dbnode/persist/fs/cross_block_reader.go | 33 +++++++++++---------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 93045157ec..4c3f0e76e7 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -34,7 +34,7 @@ import ( ) var ( - errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index") + errReaderNotOrderedByIndex = errors.New("crossBlockReader can only use DataFileSetReaders ordered by index") errEmptyReader = errors.New("trying to read from empty reader") _ heap.Interface = (*minHeap)(nil) ) @@ -44,7 +44,7 @@ type crossBlockReader struct { id ident.ID tags ident.TagIterator records []BlockRecord - initialized bool + started bool minHeap minHeap err error } @@ -67,18 +67,27 @@ func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockRead } return &crossBlockReader{ - dataFileSetReaders: append([]DataFileSetReader{}, dataFileSetReaders...), + dataFileSetReaders: append(make([]DataFileSetReader, 0, len(dataFileSetReaders)), dataFileSetReaders...), records: make([]BlockRecord, 0, len(dataFileSetReaders)), }, nil } func (r *crossBlockReader) Next() bool { - if !r.initialized { - err := r.init() - if err != nil { - r.err = err + if r.err != nil { + return false + } + + var emptyRecord BlockRecord + if !r.started { + if r.err = r.start(); r.err != nil { return false } + } else { + // use empty var in inner loop with "for i := range" to have compiler use memclr optimization + // see: https://codereview.appspot.com/137880043 + for i := range r.records { + r.records[i] = emptyRecord + } } if len(r.minHeap) == 0 { @@ -94,12 +103,6 @@ func (r *crossBlockReader) Next() bool { r.id = firstEntry.id r.tags = firstEntry.tags - // use empty var in inner loop with "for i := range" to have compiler use memclr optimization - // see: https://codereview.appspot.com/137880043 - var emptyRecord BlockRecord - for i := range r.records { - r.records[i] = emptyRecord - } r.records = r.records[:0] r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum}) @@ -156,8 +159,8 @@ func (r *crossBlockReader) readOne() (*minHeapEntry, error) { return entry, nil } -func (r *crossBlockReader) init() error { - r.initialized = true +func (r *crossBlockReader) start() error { + r.started = true r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders)) for i := range r.dataFileSetReaders { From 28c0cabc9ed0cc0cb48533cebf1312f4b7aa5810 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 11 Aug 2020 09:39:54 +0300 Subject: [PATCH 21/23] Check for duplicate ids --- src/dbnode/persist/fs/cross_block_reader.go | 3 +++ src/dbnode/persist/fs/cross_block_reader_test.go | 12 +++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 4c3f0e76e7..57f17dcf1e 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -151,6 +151,9 @@ func (r *crossBlockReader) readOne() (*minHeapEntry, error) { r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil } else if err != nil { return nil, err + } else if bytes.Equal(nextEntry.id.Bytes(), entry.id.Bytes()) { + return nil, fmt.Errorf("duplicate id %s on block starting at %s", + entry.id, r.dataFileSetReaders[entry.dataFileSetReaderIndex].Range().Start) } else { heap.Push(&r.minHeap, nextEntry) } diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 35d2e21f45..9d4fc5f7b3 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "strconv" + "strings" "testing" "time" @@ -85,6 +86,7 @@ func TestCrossBlockReader(t *testing.T) { {"many readers with unordered series", [][]string{{"id3"}, {"id1"}, {"id2"}}}, {"complex case", [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}}, {"immediate reader error", [][]string{{"error"}}}, + {"duplicate id within a reader", [][]string{{"id1", "id2"}, {"id2", "id2"}}}, {"reader error later", [][]string{{"id1", "id2"}, {"id1", "error"}}}, } @@ -106,7 +108,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { for blockIndex, ids := range blockSeriesIds { dfsReader := NewMockDataFileSetReader(ctrl) dfsReader.EXPECT().OrderedByIndex().Return(true) - dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}) + dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}).AnyTimes() blockHasError := false for j, id := range ids { @@ -145,10 +147,10 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { assert.NotNil(t, tags) tags.Close() - var previousBlockIndex uint32 + previousBlockIndex := -1 for _, record := range records { - blockIndex := record.DataChecksum // see the comment above - assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order") + blockIndex := int(record.DataChecksum) // see the comment above + assert.True(t, blockIndex > previousBlockIndex, "same id blocks must be read in temporal order") previousBlockIndex = blockIndex assert.NotNil(t, record.Data) record.Data.DecRef() @@ -160,7 +162,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { } err = cbReader.Err() - if err == nil || err.Error() != expectedError.Error() { + if err == nil || (err.Error() != expectedError.Error() && !strings.HasPrefix(err.Error(), "duplicate id")) { require.NoError(t, cbReader.Err()) assert.Equal(t, expectedCount, actualCount, "count of series read") } From 79ee91bcaf1be221ba4ee78d8fff3266de639ec8 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 11 Aug 2020 10:47:55 +0300 Subject: [PATCH 22/23] Further feedback --- src/dbnode/persist/fs/cross_block_reader.go | 2 +- .../persist/fs/cross_block_reader_test.go | 89 ++++++++++++++----- src/dbnode/persist/fs/types.go | 1 + 3 files changed, 70 insertions(+), 22 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 57f17dcf1e..72914f604e 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -124,7 +124,7 @@ func (r *crossBlockReader) Next() bool { return false } - // id and tags not needed for subsequent blocs because they are the same as in the first block + // id and tags not needed for subsequent blocks because they are the same as in the first block nextEntry.id.Finalize() nextEntry.tags.Close() diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 9d4fc5f7b3..120800bc9d 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -76,34 +76,79 @@ func TestCrossBlockReader(t *testing.T) { tests := []struct { name string blockSeriesIDs [][]string + expectedIDs []string }{ - {"no readers", [][]string{}}, - {"empty readers", [][]string{{}, {}, {}}}, - {"one reader, one series", [][]string{{"id1"}}}, - {"one reader, many series", [][]string{{"id1", "id2", "id3"}}}, - {"many readers with same series", [][]string{{"id1"}, {"id1"}, {"id1"}}}, - {"many readers with different series", [][]string{{"id1"}, {"id2"}, {"id3"}}}, - {"many readers with unordered series", [][]string{{"id3"}, {"id1"}, {"id2"}}}, - {"complex case", [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}}, - {"immediate reader error", [][]string{{"error"}}}, - {"duplicate id within a reader", [][]string{{"id1", "id2"}, {"id2", "id2"}}}, - {"reader error later", [][]string{{"id1", "id2"}, {"id1", "error"}}}, + { + name: "no readers", + blockSeriesIDs: [][]string{}, + expectedIDs: []string{}, + }, + { + name: "empty readers", + blockSeriesIDs: [][]string{{}, {}, {}}, + expectedIDs: []string{}, + }, + { + name: "one reader, one series", + blockSeriesIDs: [][]string{{"id1"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "one reader, many series", + blockSeriesIDs: [][]string{{"id1", "id2", "id3"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "many readers with same series", + blockSeriesIDs: [][]string{{"id1"}, {"id1"}, {"id1"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "many readers with different series", + blockSeriesIDs: [][]string{{"id1"}, {"id2"}, {"id3"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "many readers with unordered series", + blockSeriesIDs: [][]string{{"id3"}, {"id1"}, {"id2"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "complex case", + blockSeriesIDs: [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}, + expectedIDs: []string{"id1", "id2", "id3", "id4", "id5"}, + }, + { + name: "duplicate ids within a reader", + blockSeriesIDs: [][]string{{"id1", "id2"}, {"id2", "id2"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "immediate reader error", + blockSeriesIDs: [][]string{{"error"}}, + expectedIDs: []string{}, + }, + { + name: "reader error later", + blockSeriesIDs: [][]string{{"id1", "id2"}, {"id1", "error"}}, + expectedIDs: []string{}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - testCrossBlockReader(t, tt.blockSeriesIDs) + testCrossBlockReader(t, tt.blockSeriesIDs, tt.expectedIDs) }) } } -func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { +func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string, expectedIDs []string) { ctrl := xtest.NewController(t) defer ctrl.Finish() now := time.Now().Truncate(time.Hour) var dfsReaders []DataFileSetReader - expectedCount := 0 + expectedBlockCount := 0 for blockIndex, ids := range blockSeriesIds { dfsReader := NewMockDataFileSetReader(ctrl) @@ -128,21 +173,21 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { } dfsReaders = append(dfsReaders, dfsReader) - expectedCount += len(ids) + expectedBlockCount += len(ids) } cbReader, err := NewCrossBlockReader(dfsReaders) require.NoError(t, err) defer cbReader.Close() - actualCount := 0 - previousId := "" + blockCount := 0 + seriesCount := 0 for cbReader.Next() { id, tags, records := cbReader.Current() strId := id.String() id.Finalize() - assert.True(t, strId > previousId, "series must be read in increasing id order") + assert.Equal(t, expectedIDs[seriesCount], strId) assert.NotNil(t, tags) tags.Close() @@ -157,14 +202,16 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) { record.Data.Finalize() } - previousId = strId - actualCount += len(records) + blockCount += len(records) + seriesCount++ } + assert.Equal(t, len(expectedIDs), seriesCount, "count of series read") + err = cbReader.Err() if err == nil || (err.Error() != expectedError.Error() && !strings.HasPrefix(err.Error(), "duplicate id")) { require.NoError(t, cbReader.Err()) - assert.Equal(t, expectedCount, actualCount, "count of series read") + assert.Equal(t, expectedBlockCount, blockCount, "count of blocks read") } for _, dfsReader := range dfsReaders { diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index dadd5e1e5f..32c1ce2a39 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -174,6 +174,7 @@ type DataFileSetReader interface { MetadataRead() int // OrderedByIndex returns true if the reader reads the data in the order of index. + // If false, the reader reads the data in the same order as it is stored in the data file. OrderedByIndex() bool } From e19830bc6be98f8a50fe1c06f2b8542082f55fd9 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 11 Aug 2020 16:26:58 +0300 Subject: [PATCH 23/23] Duplicate id is an invariant violation --- src/dbnode/persist/fs/cross_block_reader.go | 15 +++++++++++++-- src/dbnode/persist/fs/cross_block_reader_test.go | 7 ++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go index 72914f604e..7642653861 100644 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -31,6 +31,9 @@ import ( "github.com/m3db/m3/src/x/checked" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + + "go.uber.org/zap" ) var ( @@ -47,13 +50,14 @@ type crossBlockReader struct { started bool minHeap minHeap err error + iOpts instrument.Options } // NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders. // DataFileSetReaders must be configured to return the data in the order of index, and must be // provided in a slice sorted by block start time. // Callers are responsible for closing the DataFileSetReaders. -func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockReader, error) { +func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader, iOpts instrument.Options) (CrossBlockReader, error) { var previousStart time.Time for _, dataFileSetReader := range dataFileSetReaders { if !dataFileSetReader.OrderedByIndex() { @@ -69,6 +73,7 @@ func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockRead return &crossBlockReader{ dataFileSetReaders: append(make([]DataFileSetReader, 0, len(dataFileSetReaders)), dataFileSetReaders...), records: make([]BlockRecord, 0, len(dataFileSetReaders)), + iOpts: iOpts, }, nil } @@ -152,8 +157,14 @@ func (r *crossBlockReader) readOne() (*minHeapEntry, error) { } else if err != nil { return nil, err } else if bytes.Equal(nextEntry.id.Bytes(), entry.id.Bytes()) { - return nil, fmt.Errorf("duplicate id %s on block starting at %s", + err := fmt.Errorf("duplicate id %s on block starting at %s", entry.id, r.dataFileSetReaders[entry.dataFileSetReaderIndex].Range().Start) + + instrument.EmitAndLogInvariantViolation(r.iOpts, func(l *zap.Logger) { + l.Error(err.Error()) + }) + + return nil, err } else { heap.Push(&r.minHeap, nextEntry) } diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go index 120800bc9d..819740fee6 100644 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" @@ -46,7 +47,7 @@ func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) { dfsReader := NewMockDataFileSetReader(ctrl) dfsReader.EXPECT().OrderedByIndex().Return(false) - _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader}) + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader}, instrument.NewTestOptions(t)) assert.Equal(t, errReaderNotOrderedByIndex, err) } @@ -65,7 +66,7 @@ func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) { dfsReader2.EXPECT().OrderedByIndex().Return(true) dfsReader2.EXPECT().Range().Return(xtime.Range{Start: later}) - _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1}) + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1}, instrument.NewTestOptions(t)) expectedErr := fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", later, now) @@ -176,7 +177,7 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string, expectedIDs [ expectedBlockCount += len(ids) } - cbReader, err := NewCrossBlockReader(dfsReaders) + cbReader, err := NewCrossBlockReader(dfsReaders, instrument.NewTestOptions(t)) require.NoError(t, err) defer cbReader.Close()