Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Cross-block series reader #2481

Merged
merged 27 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cf04c4b
[dbnode] Add OrderedByIndex option for DataFileSetReader.Open
linasm Jul 14, 2020
c55c21c
Merge branch 'large-tiles-aggregation' into linas/sorted-read
linasm Jul 23, 2020
76f5379
[dbnode] Cross block series reader
linasm Jul 22, 2020
6499aca
Assert on OrderedByIndex
linasm Jul 22, 2020
92d7758
Tests
linasm Jul 23, 2020
1211434
Mocks
linasm Jul 23, 2020
f3dead9
Dont test just the happy path
linasm Jul 23, 2020
3bddd14
Addressed review feedback
linasm Jul 25, 2020
8f68dd5
Legal stuff
linasm Jul 25, 2020
a456c95
Group Read() results by id
linasm Aug 4, 2020
cb60730
Remodel CrossBlockReader as an Iterator
linasm Aug 4, 2020
4a2749d
Mockgen
linasm Aug 4, 2020
83845d2
Erase slice contents before draining them
linasm Aug 5, 2020
0f41464
Merge branch 'large-tiles-aggregation' into linas/sorted-read
linasm Aug 5, 2020
367551a
Align with master
linasm Aug 5, 2020
f1402cf
Merge branch 'linas/sorted-read' into linasm/cross-block-series-reader
linasm Aug 5, 2020
3f3b493
Make a defensive copy of dataFileSetReaders
linasm Aug 5, 2020
6a1912f
Fuse else / if
linasm Aug 10, 2020
bbf26f9
Merge remote-tracking branch 'origin/linas/sorted-read' into linasm/c…
linasm Aug 10, 2020
8081417
Address feedback
linasm Aug 10, 2020
dd207d1
Mockgen
linasm Aug 10, 2020
21e4b13
Fix test
linasm Aug 10, 2020
a766cf4
Better conversion to string
linasm Aug 10, 2020
29f3b25
Address review feedback
linasm Aug 11, 2020
28c0cab
Check for duplicate ids
linasm Aug 11, 2020
79ee91b
Further feedback
linasm Aug 11, 2020
e19830b
Duplicate id is an invariant violation
linasm Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 45 additions & 22 deletions src/dbnode/persist/fs/cross_block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ import (
)

var (
errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index")
_ heap.Interface = (*minHeap)(nil)
errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index")
linasm marked this conversation as resolved.
Show resolved Hide resolved
errEmptyReader = errors.New("trying to read from empty reader")
_ heap.Interface = (*minHeap)(nil)
)

type crossBlockReader struct {
dataFileSetReaders []DataFileSetReader
activeReadersCount int
id ident.ID
tags ident.TagIterator
records []BlockRecord
initialized bool
linasm marked this conversation as resolved.
Show resolved Hide resolved
minHeap minHeap
err error
Expand All @@ -63,51 +66,66 @@ func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (CrossBlockRead
previousStart = currentStart
}

return &crossBlockReader{dataFileSetReaders: dataFileSetReaders, activeReadersCount: len(dataFileSetReaders)}, nil
return &crossBlockReader{
dataFileSetReaders: dataFileSetReaders,
records: make([]BlockRecord, 0, len(dataFileSetReaders)),
}, nil
}

func (r *crossBlockReader) Read() (id ident.ID, tags ident.TagIterator, datas []checked.Bytes, checksums []uint32, err error) {
func (r *crossBlockReader) Next() bool {
if !r.initialized {
linasm marked this conversation as resolved.
Show resolved Hide resolved
r.initialized = true
err := r.init()
if err != nil {
return nil, nil, nil, nil, err
r.err = err
return false
}
}

if len(r.minHeap) == 0 {
return false
}

firstEntry, err := r.readOne()
if err != nil {
return nil, nil, nil, nil, err
r.err = err
return false
}

datas = make([]checked.Bytes, 0, r.activeReadersCount)
checksums = make([]uint32, 0, r.activeReadersCount)
r.id = firstEntry.id
r.tags = firstEntry.tags

datas = append(datas, firstEntry.data)
checksums = append(checksums, firstEntry.checksum)
r.records = r.records[:0]
linasm marked this conversation as resolved.
Show resolved Hide resolved
r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum})

for len(r.minHeap) > 0 && r.minHeap[0].id.Equal(firstEntry.id) {
nextEntry, err := r.readOne()
if err != nil {
// Finalize what was already read:
for _, data := range datas {
data.DecRef()
data.Finalize()
// Close the resources that were already read but not returned to the consumer:
r.id.Finalize()
r.tags.Close()
for _, record := range r.records {
record.Data.DecRef()
record.Data.Finalize()
}
return nil, nil, nil, nil, err
r.records = r.records[:0]
r.err = err
return false
}
nextEntry.id.Finalize()
nextEntry.tags.Close()
datas = append(datas, nextEntry.data)
checksums = append(checksums, nextEntry.checksum)
r.records = append(r.records, BlockRecord{nextEntry.data, nextEntry.checksum})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it worth having a sanity check to see that len(r.records) <= len(dataFileSetReaders)? Otherwise we necessarily have a duplicate ID in one of the readers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, just that the len(dataFileSetReaders) is not the best indicator for duplicate ids because I invalidate (set to null) already exhausted dataFileSetReaders and don't track the number of still valid ones. Instead, implemented a more direct check for duplicate ids in readOne (and pushed as a separate commit for clarity).
Just not completely sure, for our use case, would it make more sense to return an error and fail the whole process, or to skip the duplicate. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our conversation, keeping the error approach, and reporting it as Invariant violation.

}

return firstEntry.id, firstEntry.tags, datas, checksums, nil
return true
}

func (r *crossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) {
return r.id, r.tags, r.records
}

func (r *crossBlockReader) readOne() (*minHeapEntry, error) {
if len(r.minHeap) == 0 {
return nil, io.EOF
return nil, errEmptyReader
}

entry := heap.Pop(&r.minHeap).(*minHeapEntry)
Expand All @@ -116,7 +134,6 @@ func (r *crossBlockReader) readOne() (*minHeapEntry, error) {
if err == io.EOF {
// will no longer read from this one
r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil
r.activeReadersCount--
} else if err != nil {
return nil, err
} else {
Expand All @@ -128,6 +145,7 @@ func (r *crossBlockReader) readOne() (*minHeapEntry, error) {
}

func (r *crossBlockReader) init() error {
r.initialized = true
r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders))

for i := range r.dataFileSetReaders {
Expand Down Expand Up @@ -169,7 +187,12 @@ func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error)
}, nil
}

func (r *crossBlockReader) Err() error {
return r.err
}

func (r *crossBlockReader) Close() error {
// Close the resources that were buffered in minHeap:
for _, entry := range r.minHeap {
entry.id.Finalize()
entry.tags.Close()
Expand Down
36 changes: 13 additions & 23 deletions src/dbnode/persist/fs/cross_block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func TestCrossBlockReader(t *testing.T) {
{"many readers with different series", [][]string{{"id1"}, {"id2"}, {"id3"}}},
{"many readers with unordered series", [][]string{{"id3"}, {"id1"}, {"id2"}}},
{"complex case", [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}},
{"reader error", [][]string{{"id1", "id2"}, {"id1", "error"}}},
{"immediate reader error", [][]string{{"error"}}},
{"reader error later", [][]string{{"id1", "id2"}, {"id1", "error"}}},
}

for _, tt := range tests {
Expand Down Expand Up @@ -131,19 +132,10 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) {
require.NoError(t, err)
defer cbReader.Close()

hadError := false
actualCount := 0
previousId := ""
for {
id, tags, datas, checksums, err := cbReader.Read()
if err == io.EOF {
break
}
if err != nil && err.Error() == expectedError.Error() {
hadError = true
break
}
require.NoError(t, err)
for cbReader.Next() {
id, tags, records := cbReader.Current()

strId := id.String()
id.Finalize()
Expand All @@ -152,25 +144,23 @@ func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) {
assert.NotNil(t, tags)
tags.Close()

assert.Equal(t, len(datas), len(checksums))

var previousBlockIndex uint32
for _, blockIndex := range checksums { // see the comment above
for _, record := range records {
blockIndex := record.Checksum // see the comment above
assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order")
previousBlockIndex = blockIndex
}

for _, data := range datas {
assert.NotNil(t, data)
data.DecRef()
data.Finalize()
assert.NotNil(t, record.Data)
record.Data.DecRef()
record.Data.Finalize()
}

previousId = strId
actualCount += len(datas)
actualCount += len(records)
}

if !hadError {
err = cbReader.Err()
if err == nil || err.Error() != expectedError.Error() {
require.NoError(t, cbReader.Err())
assert.Equal(t, expectedCount, actualCount, "count of series read")
}
}
16 changes: 13 additions & 3 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,25 @@ type Segments interface {
BlockStart() time.Time
}

// BlockRecord wraps together M3TSZ data bytes with their checksum.
type BlockRecord struct {
Data checked.Bytes
Checksum uint32
linasm marked this conversation as resolved.
Show resolved Hide resolved
}

// CrossBlockReader allows reading data (encoded bytes) from multiple DataFileSetReaders of the same shard,
// ordered by series id first, and block start time next.
type CrossBlockReader interface {
io.Closer

// Read returns the next distinct id and tags, plus slices with data and checksums from all blocks corresponding to
// the id returned. Returns io.EOF after all DataFileSetReaders exhausted.
Next() bool

Err() error
linasm marked this conversation as resolved.
Show resolved Hide resolved

// Current returns distinct series id and tags, plus a slice with data and checksums from all blocks corresponding
// to that series (in temporal order).
// Note: make sure to finalize the ID, close the Tags and finalize the Data when done with
// them so they can be returned to their respective pools.
Read() (id ident.ID, tags ident.TagIterator, datas []checked.Bytes, checksums []uint32, err error)
Current() (ident.ID, ident.TagIterator, []BlockRecord)

linasm marked this conversation as resolved.
Show resolved Hide resolved
}