Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Cross-block series reader #2481

Merged
merged 27 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cf04c4b
[dbnode] Add OrderedByIndex option for DataFileSetReader.Open
linasm Jul 14, 2020
c55c21c
Merge branch 'large-tiles-aggregation' into linas/sorted-read
linasm Jul 23, 2020
76f5379
[dbnode] Cross block series reader
linasm Jul 22, 2020
6499aca
Assert on OrderedByIndex
linasm Jul 22, 2020
92d7758
Tests
linasm Jul 23, 2020
1211434
Mocks
linasm Jul 23, 2020
f3dead9
Dont test just the happy path
linasm Jul 23, 2020
3bddd14
Addressed review feedback
linasm Jul 25, 2020
8f68dd5
Legal stuff
linasm Jul 25, 2020
a456c95
Group Read() results by id
linasm Aug 4, 2020
cb60730
Remodel CrossBlockReader as an Iterator
linasm Aug 4, 2020
4a2749d
Mockgen
linasm Aug 4, 2020
83845d2
Erase slice contents before draining them
linasm Aug 5, 2020
0f41464
Merge branch 'large-tiles-aggregation' into linas/sorted-read
linasm Aug 5, 2020
367551a
Align with master
linasm Aug 5, 2020
f1402cf
Merge branch 'linas/sorted-read' into linasm/cross-block-series-reader
linasm Aug 5, 2020
3f3b493
Make a defensive copy of dataFileSetReaders
linasm Aug 5, 2020
6a1912f
Fuse else / if
linasm Aug 10, 2020
bbf26f9
Merge remote-tracking branch 'origin/linas/sorted-read' into linasm/c…
linasm Aug 10, 2020
8081417
Address feedback
linasm Aug 10, 2020
dd207d1
Mockgen
linasm Aug 10, 2020
21e4b13
Fix test
linasm Aug 10, 2020
a766cf4
Better conversion to string
linasm Aug 10, 2020
29f3b25
Address review feedback
linasm Aug 11, 2020
28c0cab
Check for duplicate ids
linasm Aug 11, 2020
79ee91b
Further feedback
linasm Aug 11, 2020
e19830b
Duplicate id is an invariant violation
linasm Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
Expand Down
274 changes: 274 additions & 0 deletions src/dbnode/persist/fs/cross_block_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package fs

import (
"bytes"
"container/heap"
"errors"
"fmt"
"io"
"time"

"github.com/m3db/m3/src/x/checked"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"

"go.uber.org/zap"
)

var (
errReaderNotOrderedByIndex = errors.New("crossBlockReader can only use DataFileSetReaders ordered by index")
errEmptyReader = errors.New("trying to read from empty reader")
_ heap.Interface = (*minHeap)(nil)
)

type crossBlockReader struct {
dataFileSetReaders []DataFileSetReader
id ident.ID
tags ident.TagIterator
records []BlockRecord
started bool
minHeap minHeap
err error
iOpts instrument.Options
}

// NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders.
// DataFileSetReaders must be configured to return the data in the order of index, and must be
// provided in a slice sorted by block start time.
// Callers are responsible for closing the DataFileSetReaders.
func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader, iOpts instrument.Options) (CrossBlockReader, error) {
var previousStart time.Time
for _, dataFileSetReader := range dataFileSetReaders {
if !dataFileSetReader.OrderedByIndex() {
return nil, errReaderNotOrderedByIndex
}
currentStart := dataFileSetReader.Range().Start
if !currentStart.After(previousStart) {
return nil, fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", previousStart, currentStart)
}
previousStart = currentStart
}

return &crossBlockReader{
dataFileSetReaders: append(make([]DataFileSetReader, 0, len(dataFileSetReaders)), dataFileSetReaders...),
records: make([]BlockRecord, 0, len(dataFileSetReaders)),
iOpts: iOpts,
}, nil
}

func (r *crossBlockReader) Next() bool {
if r.err != nil {
return false
}

var emptyRecord BlockRecord
if !r.started {
if r.err = r.start(); r.err != nil {
return false
}
} else {
// use empty var in inner loop with "for i := range" to have compiler use memclr optimization
// see: https://codereview.appspot.com/137880043
for i := range r.records {
r.records[i] = emptyRecord
}
}

if len(r.minHeap) == 0 {
return false
}

firstEntry, err := r.readOne()
if err != nil {
r.err = err
return false
}

r.id = firstEntry.id
r.tags = firstEntry.tags

r.records = r.records[:0]
linasm marked this conversation as resolved.
Show resolved Hide resolved
r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum})

for len(r.minHeap) > 0 && r.minHeap[0].id.Equal(firstEntry.id) {
nextEntry, err := r.readOne()
if err != nil {
// Close the resources that were already read but not returned to the consumer:
r.id.Finalize()
r.tags.Close()
for _, record := range r.records {
record.Data.DecRef()
record.Data.Finalize()
}
for i := range r.records {
r.records[i] = emptyRecord
}
r.records = r.records[:0]
r.err = err
return false
}

// id and tags not needed for subsequent blocks because they are the same as in the first block
nextEntry.id.Finalize()
nextEntry.tags.Close()
linasm marked this conversation as resolved.
Show resolved Hide resolved

r.records = append(r.records, BlockRecord{nextEntry.data, nextEntry.checksum})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it worth having a sanity check to see that len(r.records) <= len(dataFileSetReaders)? Otherwise we necessarily have a duplicate ID in one of the readers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, just that the len(dataFileSetReaders) is not the best indicator for duplicate ids because I invalidate (set to null) already exhausted dataFileSetReaders and don't track the number of still valid ones. Instead, implemented a more direct check for duplicate ids in readOne (and pushed as a separate commit for clarity).
Just not completely sure, for our use case, would it make more sense to return an error and fail the whole process, or to skip the duplicate. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our conversation, keeping the error approach, and reporting it as Invariant violation.

}

return true
}

func (r *crossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) {
return r.id, r.tags, r.records
}

func (r *crossBlockReader) readOne() (*minHeapEntry, error) {
if len(r.minHeap) == 0 {
return nil, errEmptyReader
}

entry := heap.Pop(&r.minHeap).(*minHeapEntry)
if r.dataFileSetReaders[entry.dataFileSetReaderIndex] != nil {
nextEntry, err := r.readFromDataFileSet(entry.dataFileSetReaderIndex)
if err == io.EOF {
// will no longer read from this one
r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil
} else if err != nil {
return nil, err
} else if bytes.Equal(nextEntry.id.Bytes(), entry.id.Bytes()) {
err := fmt.Errorf("duplicate id %s on block starting at %s",
entry.id, r.dataFileSetReaders[entry.dataFileSetReaderIndex].Range().Start)

instrument.EmitAndLogInvariantViolation(r.iOpts, func(l *zap.Logger) {
l.Error(err.Error())
})

return nil, err
} else {
heap.Push(&r.minHeap, nextEntry)
linasm marked this conversation as resolved.
Show resolved Hide resolved
}
}

return entry, nil
}

func (r *crossBlockReader) start() error {
r.started = true
r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders))

for i := range r.dataFileSetReaders {
entry, err := r.readFromDataFileSet(i)
if err == io.EOF {
continue
}
if err != nil {
return err
}
r.minHeap = append(r.minHeap, entry)
}

heap.Init(&r.minHeap)

return nil
}

func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error) {
id, tags, data, checksum, err := r.dataFileSetReaders[index].Read()

if err == io.EOF {
return nil, err
}

if err != nil {
multiErr := xerrors.NewMultiError().
Add(err).
Add(r.Close())
return nil, multiErr.FinalError()
}

return &minHeapEntry{
dataFileSetReaderIndex: index,
id: id,
tags: tags,
data: data,
checksum: checksum,
}, nil
}

func (r *crossBlockReader) Err() error {
return r.err
}

func (r *crossBlockReader) Close() error {
// Close the resources that were buffered in minHeap:
for i, entry := range r.minHeap {
entry.id.Finalize()
entry.tags.Close()
entry.data.DecRef()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already defer DecRef() inside of the DataFileSetReader already before returning the underlying data?

var data checked.Bytes
if r.bytesPool != nil {
        data = r.bytesPool.Get(int(entry.Size))
        data.IncRef()
        defer data.DecRef()
} else {
        data = checked.NewBytes(make([]byte, 0, entry.Size), nil)
        data.IncRef()
        defer data.DecRef()
}

I also commented this on the other PR: #2465 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this needs some additional thought, I guess I'll wait for @robskillington feedback on the questions that you raised on #2465.

entry.data.Finalize()
r.minHeap[i] = nil
}

r.minHeap = r.minHeap[:0]
return nil
linasm marked this conversation as resolved.
Show resolved Hide resolved
}

type minHeapEntry struct {
dataFileSetReaderIndex int
id ident.ID
tags ident.TagIterator
data checked.Bytes
checksum uint32
}

type minHeap []*minHeapEntry

func (h minHeap) Len() int {
return len(h)
}

func (h minHeap) Less(i, j int) bool {
idsCmp := bytes.Compare(h[i].id.Bytes(), h[j].id.Bytes())
if idsCmp == 0 {
return h[i].dataFileSetReaderIndex < h[j].dataFileSetReaderIndex
}
return idsCmp < 0
}

func (h minHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *minHeap) Push(x interface{}) {
*h = append(*h, x.(*minHeapEntry))
}

func (h *minHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
return x
}
Loading