Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Cross-block series reader #2481

Merged
merged 27 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cf04c4b
[dbnode] Add OrderedByIndex option for DataFileSetReader.Open
linasm Jul 14, 2020
c55c21c
Merge branch 'large-tiles-aggregation' into linas/sorted-read
linasm Jul 23, 2020
76f5379
[dbnode] Cross block series reader
linasm Jul 22, 2020
6499aca
Assert on OrderedByIndex
linasm Jul 22, 2020
92d7758
Tests
linasm Jul 23, 2020
1211434
Mocks
linasm Jul 23, 2020
f3dead9
Dont test just the happy path
linasm Jul 23, 2020
3bddd14
Addressed review feedback
linasm Jul 25, 2020
8f68dd5
Legal stuff
linasm Jul 25, 2020
a456c95
Group Read() results by id
linasm Aug 4, 2020
cb60730
Remodel CrossBlockReader as an Iterator
linasm Aug 4, 2020
4a2749d
Mockgen
linasm Aug 4, 2020
83845d2
Erase slice contents before draining them
linasm Aug 5, 2020
0f41464
Merge branch 'large-tiles-aggregation' into linas/sorted-read
linasm Aug 5, 2020
367551a
Align with master
linasm Aug 5, 2020
f1402cf
Merge branch 'linas/sorted-read' into linasm/cross-block-series-reader
linasm Aug 5, 2020
3f3b493
Make a defensive copy of dataFileSetReaders
linasm Aug 5, 2020
6a1912f
Fuse else / if
linasm Aug 10, 2020
bbf26f9
Merge remote-tracking branch 'origin/linas/sorted-read' into linasm/c…
linasm Aug 10, 2020
8081417
Address feedback
linasm Aug 10, 2020
dd207d1
Mockgen
linasm Aug 10, 2020
21e4b13
Fix test
linasm Aug 10, 2020
a766cf4
Better conversion to string
linasm Aug 10, 2020
29f3b25
Address review feedback
linasm Aug 11, 2020
28c0cab
Check for duplicate ids
linasm Aug 11, 2020
79ee91b
Further feedback
linasm Aug 11, 2020
e19830b
Duplicate id is an invariant violation
linasm Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
Expand Down
168 changes: 168 additions & 0 deletions src/dbnode/persist/fs/cross_block_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package fs

import (
"bytes"
"container/heap"
"errors"
"fmt"
"io"
"time"

"github.com/m3db/m3/src/x/checked"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
)

var (
errReaderNotOrderedByIndex = errors.New("CrossBlockReader can only use DataFileSetReaders ordered by index")
_ CrossBlockReader = (*crossBlockReader)(nil)
linasm marked this conversation as resolved.
Show resolved Hide resolved
_ heap.Interface = (*minHeap)(nil)
)

type crossBlockReader struct {
dataFileSetReaders []DataFileSetReader
initialized bool
linasm marked this conversation as resolved.
Show resolved Hide resolved
minHeap minHeap
err error
}

type minHeapEntry struct {
linasm marked this conversation as resolved.
Show resolved Hide resolved
dataFileSetReaderIndex int
id ident.ID
tags ident.TagIterator
data checked.Bytes
checksum uint32
}

type minHeap []*minHeapEntry

// NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders.
// DataFileSetReaders must be configured to return the data in the order of index, and must be
// provided in a slice sorted by block start time.
// Callers are responsible for closing the DataFileSetReaders.
func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader) (*crossBlockReader, error) {
var previousStart time.Time
for _, dataFileSetReader := range dataFileSetReaders {
if !dataFileSetReader.IsOrderedByIndex() {
return nil, errReaderNotOrderedByIndex
}
currentStart := dataFileSetReader.Range().Start
if !currentStart.After(previousStart) {
return nil, fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", previousStart, currentStart)
}
previousStart = currentStart
}

return &crossBlockReader{dataFileSetReaders: dataFileSetReaders}, nil
}

func (r *crossBlockReader) Read() (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) {
if !r.initialized {
linasm marked this conversation as resolved.
Show resolved Hide resolved
r.initialized = true
err := r.init()
if err != nil {
return nil, nil, nil, 0, err
}
}

if len(r.minHeap) == 0 {
return nil, nil, nil, 0, io.EOF
}

entry := heap.Pop(&r.minHeap).(*minHeapEntry)
if r.dataFileSetReaders[entry.dataFileSetReaderIndex] != nil {
nextEntry, err := r.readFromDataFileSet(entry.dataFileSetReaderIndex)
if err == io.EOF {
// will no longer read from this one
r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil
} else if err != nil {
return nil, nil, nil, 0, err
} else {
heap.Push(&r.minHeap, nextEntry)
linasm marked this conversation as resolved.
Show resolved Hide resolved
}
}

return entry.id, entry.tags, entry.data, entry.checksum, nil
}

func (r *crossBlockReader) init() error {
r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders))

for i := range r.dataFileSetReaders {
entry, err := r.readFromDataFileSet(i)
if err == io.EOF {
continue
}
if err != nil {
return err
}
r.minHeap = append(r.minHeap, entry)
}

heap.Init(&r.minHeap)

return nil
}

func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error) {
id, tags, data, checksum, err := r.dataFileSetReaders[index].Read()

if err == io.EOF {
return nil, err
}

if err != nil {
closeErr := r.Close()
if closeErr != nil {
return nil, xerrors.NewMultiError().Add(err).Add(closeErr)
}
return nil, err
linasm marked this conversation as resolved.
Show resolved Hide resolved
}

return &minHeapEntry{
dataFileSetReaderIndex: index,
id: id,
tags: tags,
data: data,
checksum: checksum,
}, nil
}

func (r *crossBlockReader) Close() error {
for _, entry := range r.minHeap {
entry.id.Finalize()
entry.tags.Close()
entry.data.DecRef()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already defer DecRef() inside of the DataFileSetReader already before returning the underlying data?

var data checked.Bytes
if r.bytesPool != nil {
        data = r.bytesPool.Get(int(entry.Size))
        data.IncRef()
        defer data.DecRef()
} else {
        data = checked.NewBytes(make([]byte, 0, entry.Size), nil)
        data.IncRef()
        defer data.DecRef()
}

I also commented this on the other PR: #2465 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this needs some additional thought, I guess I'll wait for @robskillington feedback on the questions that you raised on #2465.

entry.data.Finalize()
}
r.minHeap = r.minHeap[:0]
return nil
linasm marked this conversation as resolved.
Show resolved Hide resolved
}

func (h minHeap) Len() int {
return len(h)
}

func (h minHeap) Less(i, j int) bool {
idsCmp := bytes.Compare(h[i].id.Bytes(), h[j].id.Bytes())
if idsCmp == 0 {
return h[i].dataFileSetReaderIndex < h[j].dataFileSetReaderIndex
}
return idsCmp < 0
}

func (h minHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *minHeap) Push(x interface{}) {
*h = append(*h, x.(*minHeapEntry))
}

func (h *minHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
153 changes: 153 additions & 0 deletions src/dbnode/persist/fs/cross_block_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package fs

import (
"errors"
"fmt"
"io"
"testing"
"time"

"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var expectedError = errors.New("expected error")

func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

dfsReader := NewMockDataFileSetReader(ctrl)
dfsReader.EXPECT().IsOrderedByIndex().Return(false)

_, err := NewCrossBlockReader([]DataFileSetReader{dfsReader})

assert.Equal(t, errReaderNotOrderedByIndex, err)
}

func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

now := time.Now().Truncate(time.Hour)
dfsReader1 := NewMockDataFileSetReader(ctrl)
dfsReader1.EXPECT().IsOrderedByIndex().Return(true)
dfsReader1.EXPECT().Range().Return(xtime.Range{Start: now})

later := now.Add(time.Hour)
dfsReader2 := NewMockDataFileSetReader(ctrl)
dfsReader2.EXPECT().IsOrderedByIndex().Return(true)
dfsReader2.EXPECT().Range().Return(xtime.Range{Start: later})

_, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1})

expectedErr := fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", later, now)

assert.Equal(t, expectedErr, err)
}

func TestCrossBlockReader(t *testing.T) {
tests := []struct {
name string
blockSeriesIds [][]string
linasm marked this conversation as resolved.
Show resolved Hide resolved
}{
{"no readers", [][]string{}},
{"empty readers", [][]string{{}, {}, {}}},
{"one reader, one series", [][]string{{"id1"}}},
{"one reader, many series", [][]string{{"id1", "id2", "id3"}}},
{"many readers with same series", [][]string{{"id1"}, {"id1"}, {"id1"}}},
{"many readers with different series", [][]string{{"id1"}, {"id2"}, {"id3"}}},
{"many readers with unordered series", [][]string{{"id3"}, {"id1"}, {"id2"}}},
{"complex case", [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}},
{"reader error", [][]string{{"id1", "id2"}, {"id1", "error"}}},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testCrossBlockReader(t, tt.blockSeriesIds)
})
}
}

func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

now := time.Now().Truncate(time.Hour)
var dfsReaders []DataFileSetReader
expectedCount := 0

for blockIndex, ids := range blockSeriesIds {
dfsReader := NewMockDataFileSetReader(ctrl)
dfsReader.EXPECT().IsOrderedByIndex().Return(true)
dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))})

blockHasError := false
for j, id := range ids {
tags := ident.NewTags(ident.StringTag("foo", string(j)))
data := checkedBytes([]byte{byte(j)})
checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions
if id == "error" {
dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), expectedError)
blockHasError = true
} else {
dfsReader.EXPECT().Read().Return(ident.StringID(id), ident.NewTagsIterator(tags), data, checksum, nil)
}
}

if !blockHasError {
dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF).MaxTimes(1)
}

dfsReaders = append(dfsReaders, dfsReader)
expectedCount += len(ids)
}

cbReader, err := NewCrossBlockReader(dfsReaders)
require.NoError(t, err)
defer cbReader.Close()

hadError := false
actualCount := 0
previousId := ""
var previousBlockIndex uint32
for {
id, tags, data, checksum, err := cbReader.Read()
if err == io.EOF {
break
}
if err == expectedError {
hadError = true
break
}
require.NoError(t, err)

strId := id.String()
id.Finalize()
blockIndex := checksum // see the comment above
assert.True(t, strId >= previousId, "series must be read in non-decreasing id order")
if strId == previousId {
assert.True(t, blockIndex >= previousBlockIndex, "same id blocks must be read in temporal order")
}

assert.NotNil(t, tags)
tags.Close()

assert.NotNil(t, data)
data.DecRef()
data.Finalize()

previousId = strId
previousBlockIndex = blockIndex

actualCount++
}

if !hadError {
assert.Equal(t, expectedCount, actualCount, "count of series read")
}
}
Loading