Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBNode] - Optimize Cold Flush Merge Logic #1829

Merged
merged 3 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 103 additions & 43 deletions src/dbnode/persist/fs/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,15 @@ func (m *merger) Merge(
}

var (
// There will only be one BlockReader slice since we're working within one
// block here.
brs = make([][]xio.BlockReader, 0, 1)
// There will likely be at least two BlockReaders - one for disk data and
// There will likely be at least two SegmentReaders - one for disk data and
// one for data from the merge target.
br = make([]xio.BlockReader, 0, 2)
segmentReaders = make([]xio.SegmentReader, 0, 2)

// It's safe to share these between iterations and just reset them each
// time because the series gets persisted each loop, so the previous
// iterations' reader and iterator will never be needed.
segReader = srPool.Get()
multiIter = multiIterPool.Get()
// Initialize this here with nil to be reset before each iteration's
// use.
sliceOfSlices = xio.NewReaderSliceOfSlicesFromBlockReadersIterator(nil)
// Reused context for use in mergeWith.Read, since they all do a
// BlockingClose after usage.
tmpCtx = context.NewContext()
Expand All @@ -172,6 +166,15 @@ func (m *merger) Merge(
// this merge closes the underlying writer).
idsToFinalize = make([]ident.ID, 0, reader.Entries())
tagsToFinalize = make([]ident.Tags, 0, reader.Entries())

// Shared between iterations.
iterResources = newIterResources(
multiIter,
blockStart.ToTime(),
blockSize,
blockAllocSize,
nsCtx.Schema,
encoderPool)
)
defer func() {
segReader.Finalize()
Expand All @@ -198,10 +201,8 @@ func (m *merger) Merge(
}
idsToFinalize = append(idsToFinalize, id)

// Reset BlockReaders.
brs = brs[:0]
br = br[:0]
br = append(br, blockReaderFromData(data, segReader, startTime, blockSize))
segmentReaders = segmentReaders[:0]
segmentReaders = append(segmentReaders, segmentReaderFromData(data, segReader))

// Check if this series is in memory (and thus requires merging).
tmpCtx.Reset()
Expand All @@ -210,12 +211,8 @@ func (m *merger) Merge(
return err
}
if hasData {
br = append(br, mergeWithData...)
segmentReaders = appendBlockReadersToSegmentReaders(segmentReaders, mergeWithData)
}
brs = append(brs, br)

sliceOfSlices.Reset(brs)
multiIter.ResetSliceOfSlices(sliceOfSlices, nsCtx.Schema)

// tagsIter is never nil. These tags will be valid as long as the IDs
// are valid, and the IDs are valid for the duration of the file writing.
Expand All @@ -225,8 +222,8 @@ func (m *merger) Merge(
return err
}
tagsToFinalize = append(tagsToFinalize, tags)
if err := persistIter(prepared.Persist, multiIter, startTime,
id, tags, blockAllocSize, nsCtx.Schema, encoderPool); err != nil {

if err := persistSegmentReaders(id, tags, segmentReaders, iterResources, prepared.Persist); err != nil {
return err
}
// Closing the context will finalize the data returned from
Expand All @@ -239,13 +236,10 @@ func (m *merger) Merge(
tmpCtx.Reset()
err = mergeWith.ForEachRemaining(
tmpCtx, blockStart,
func(seriesID ident.ID, tags ident.Tags, mergeWithData []xio.BlockReader) error {
brs = brs[:0]
brs = append(brs, mergeWithData)
sliceOfSlices.Reset(brs)
multiIter.ResetSliceOfSlices(sliceOfSlices, nsCtx.Schema)
err := persistIter(prepared.Persist, multiIter, startTime,
seriesID, tags, blockAllocSize, nsCtx.Schema, encoderPool)
func(id ident.ID, tags ident.Tags, mergeWithData []xio.BlockReader) error {
segmentReaders = segmentReaders[:0]
segmentReaders = appendBlockReadersToSegmentReaders(segmentReaders, mergeWithData)
err := persistSegmentReaders(id, tags, segmentReaders, iterResources, prepared.Persist)
// Context is safe to close after persisting data to disk.
tmpCtx.BlockingClose()
// Reset context here within the passed in function so that the
Expand All @@ -263,33 +257,51 @@ func (m *merger) Merge(
return prepared.Close()
}

func blockReaderFromData(
func appendBlockReadersToSegmentReaders(segReaders []xio.SegmentReader, brs []xio.BlockReader) []xio.SegmentReader {
for _, br := range brs {
segReaders = append(segReaders, br.SegmentReader)
}
return segReaders
}

func segmentReaderFromData(
data checked.Bytes,
segReader xio.SegmentReader,
startTime time.Time,
blockSize time.Duration,
) xio.BlockReader {
) xio.SegmentReader {
seg := ts.NewSegment(data, nil, ts.FinalizeHead)
segReader.Reset(seg)
return xio.BlockReader{
SegmentReader: segReader,
Start: startTime,
BlockSize: blockSize,
return segReader
}

func persistSegmentReaders(
id ident.ID,
tags ident.Tags,
segReaders []xio.SegmentReader,
ir iterResources,
persistFn persist.DataFn,
) error {
if len(segReaders) == 0 {
return nil
}

if len(segReaders) == 1 {
return persistSegmentReader(id, tags, segReaders[0], persistFn)
}

return persistIter(id, tags, segReaders, ir, persistFn)
}

func persistIter(
persistFn persist.DataFn,
it encoding.Iterator,
blockStart time.Time,
id ident.ID,
tags ident.Tags,
blockAllocSize int,
schema namespace.SchemaDescr,
encoderPool encoding.EncoderPool,
segReaders []xio.SegmentReader,
ir iterResources,
persistFn persist.DataFn,
) error {
encoder := encoderPool.Get()
encoder.Reset(blockStart, blockAllocSize, schema)
it := ir.multiIter
it.Reset(segReaders, ir.blockStart, ir.blockSize, ir.schema)
encoder := ir.encoderPool.Get()
encoder.Reset(ir.blockStart, ir.blockAllocSize, ir.schema)
for it.Next() {
if err := encoder.Encode(it.Current()); err != nil {
encoder.Close()
Expand All @@ -302,7 +314,55 @@ func persistIter(
}

segment := encoder.Discard()
checksum := digest.SegmentChecksum(segment)
return persistSegment(id, tags, segment, persistFn)
}

func persistSegmentReader(
id ident.ID,
tags ident.Tags,
segmentReader xio.SegmentReader,
persistFn persist.DataFn,
) error {
segment, err := segmentReader.Segment()
if err != nil {
return err
}
return persistSegment(id, tags, segment, persistFn)
}

func persistSegment(
id ident.ID,
tags ident.Tags,
segment ts.Segment,
persistFn persist.DataFn,
) error {
checksum := digest.SegmentChecksum(segment)
return persistFn(id, tags, segment, checksum)
}

type iterResources struct {
multiIter encoding.MultiReaderIterator
blockStart time.Time
blockSize time.Duration
blockAllocSize int
schema namespace.SchemaDescr
encoderPool encoding.EncoderPool
}

func newIterResources(
multiIter encoding.MultiReaderIterator,
blockStart time.Time,
blockSize time.Duration,
blockAllocSize int,
schema namespace.SchemaDescr,
encoderPool encoding.EncoderPool,
) iterResources {
return iterResources{
multiIter: multiIter,
blockStart: blockStart,
blockSize: blockSize,
blockAllocSize: blockAllocSize,
schema: schema,
encoderPool: encoderPool,
}
}
15 changes: 15 additions & 0 deletions src/dbnode/persist/fs/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,18 @@ func datapointsFromSegment(t *testing.T, seg ts.Segment) []ts.Datapoint {

return dps
}

func blockReaderFromData(
data checked.Bytes,
segReader xio.SegmentReader,
startTime time.Time,
blockSize time.Duration,
) xio.BlockReader {
seg := ts.NewSegment(data, nil, ts.FinalizeHead)
segReader.Reset(seg)
return xio.BlockReader{
SegmentReader: segReader,
Start: startTime,
BlockSize: blockSize,
}
}