Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Do not unnecessarily sort index entries in bootstrap paths #2533

Merged
merged 4 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var (

// errReadNotExpectedSize returned when the size of the next read does not match size specified by the index
errReadNotExpectedSize = errors.New("next read not expected size")

// errDoNotSortSetForRead returned when we specified do not sort but are attempting a regular read
errDoNotSortSetForRead = errors.New("do not sort set for regular read (index entries out of order)")
)

const (
Expand Down Expand Up @@ -99,6 +102,10 @@ type reader struct {
shard uint32
volume int
open bool
// NB(bodu): Informs whether or not we sort index entries.
// We don't need to sort for reading metadata but sorting is
// required if we are performing regulars reads.
doNotSort bool
}

// NewReader returns a new reader and expects all files to exist. Will read the
Expand Down Expand Up @@ -271,6 +278,7 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
r.open = true
r.namespace = namespace
r.shard = shard
r.doNotSort = opts.DoNotSort

return nil
}
Expand Down Expand Up @@ -337,13 +345,20 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
}
r.indexEntriesByOffsetAsc = append(r.indexEntriesByOffsetAsc, entry)
}
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))
// This is false by default so we always sort unless otherwise specified.
if !r.doNotSort {
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data. This is only required for regular reads.
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))
}
return nil
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
// NB(bodu): We cannot perform regular reads if we specified no sorting.
if r.doNotSort {
return nil, nil, nil, 0, errDoNotSortSetForRead
}
if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries {
// Have not read the index yet, this is required when reading
// data as we need each index entry in order by by the offset ascending
Expand Down
5 changes: 5 additions & 0 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ type DataFileSetReaderStatus struct {
type DataReaderOpenOptions struct {
Identifier FileSetFileIdentifier
FileSetType persist.FileSetType
// NB(bodu): This option can inform the reader to not sort
// if the downstream consumer will only be reading metadata.
// Setting this option will throw an error if a regular `Read()`
// is attempted.
DoNotSort bool
Copy link
Collaborator

@robskillington robskillington Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe we call this "ReadMetadataOnlyOptimized bool" - that way if can do further efficiency gains we can lump them behind the same flag?

}

// DataFileSetReader provides an unsynchronized reader for a TSDB file set
Expand Down
41 changes: 30 additions & 11 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
Expand All @@ -48,6 +49,7 @@ import (
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -69,6 +71,7 @@ type fileSystemSource struct {
opts Options
fsopts fs.Options
log *zap.Logger
nowFn clock.NowFn
idPool ident.Pool
newReaderFn newDataFileSetReaderFn
newReaderPoolOpts bootstrapper.NewReaderPoolOptions
Expand Down Expand Up @@ -96,6 +99,7 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) {
opts: opts,
fsopts: opts.FilesystemOptions(),
log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")),
nowFn: opts.ResultOptions().ClockOptions().NowFn(),
idPool: opts.IdentifierPool(),
newReaderFn: fs.NewReader,
persistManager: &bootstrapper.SharedPersistManager{
Expand Down Expand Up @@ -150,8 +154,7 @@ func (s *fileSystemSource) Read(

// NB(r): Perform all data bootstrapping first then index bootstrapping
// to more clearly deliniate which process is slower than the other.
nowFn := s.opts.ResultOptions().ClockOptions().NowFn()
start := nowFn()
start := s.nowFn()
dataLogFields := []zapcore.Field{
zap.Stringer("cachePolicy", s.opts.ResultOptions().SeriesCachePolicy()),
}
Expand All @@ -164,7 +167,7 @@ func (s *fileSystemSource) Read(

r, err := s.read(bootstrapDataRunType, md, namespace.DataAccumulator,
namespace.DataRunOptions.ShardTimeRanges,
namespace.DataRunOptions.RunOptions, builder)
namespace.DataRunOptions.RunOptions, builder, span)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand All @@ -176,10 +179,10 @@ func (s *fileSystemSource) Read(
})
}
s.log.Info("bootstrapping time series data success",
append(dataLogFields, zap.Duration("took", nowFn().Sub(start)))...)
append(dataLogFields, zap.Duration("took", s.nowFn().Sub(start)))...)
span.LogEvent("bootstrap_data_done")

start = nowFn()
start = s.nowFn()
s.log.Info("bootstrapping index metadata start")
span.LogEvent("bootstrap_index_start")
for _, elem := range namespaces.Namespaces.Iter() {
Expand All @@ -194,7 +197,7 @@ func (s *fileSystemSource) Read(

r, err := s.read(bootstrapIndexRunType, md, namespace.DataAccumulator,
namespace.IndexRunOptions.ShardTimeRanges,
namespace.IndexRunOptions.RunOptions, builder)
namespace.IndexRunOptions.RunOptions, builder, span)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand All @@ -210,7 +213,7 @@ func (s *fileSystemSource) Read(
results.Results.Set(md.ID(), result)
}
s.log.Info("bootstrapping index metadata success",
zap.Duration("took", nowFn().Sub(start)))
zap.Duration("took", s.nowFn().Sub(start)))
span.LogEvent("bootstrap_index_done")

return results, nil
Expand Down Expand Up @@ -459,9 +462,8 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
var (
indexBlockSize = ns.Options().IndexOptions().BlockSize()
retentionPeriod = ns.Options().RetentionOptions().RetentionPeriod()
nowFn = s.opts.ResultOptions().ClockOptions().NowFn()
beginningOfIndexRetention = retention.FlushTimeStartForRetentionPeriod(
retentionPeriod, indexBlockSize, nowFn())
retentionPeriod, indexBlockSize, s.nowFn())
initialIndexRange = xtime.Range{
Start: beginningOfIndexRetention,
End: beginningOfIndexRetention.Add(indexBlockSize),
Expand Down Expand Up @@ -677,6 +679,7 @@ func (s *fileSystemSource) read(
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
builder *result.IndexBuilder,
span opentracing.Span,
) (*runResult, error) {
var (
seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy()
Expand Down Expand Up @@ -706,6 +709,7 @@ func (s *fileSystemSource) read(
}

if run == bootstrapIndexRunType {
span.LogEvent("bootstrap_from_index_persisted_blocks_start")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the md.ID().String() and shardTimeRanges.SummaryString() as tags to this span?

// NB(r): First read all the FSTs and add to runResult index results,
// subtract the shard + time ranges from what we intend to bootstrap
// for those we found.
Expand All @@ -720,6 +724,7 @@ func (s *fileSystemSource) read(
// Set or merge result.
setOrMergeResult(r.result)
}
span.LogEvent("bootstrap_from_index_persisted_blocks_done")
}

// Create a reader pool once per bootstrap as we don't really want to
Expand All @@ -737,8 +742,22 @@ func (s *fileSystemSource) read(
panic(fmt.Errorf("unrecognized run type: %d", run))
}
runtimeOpts := s.opts.RuntimeOptionsManager().Get()
go bootstrapper.EnqueueReaders(md, runOpts, runtimeOpts, s.fsopts, shardsTimeRanges,
readerPool, readersCh, blockSize, s.log)
go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{
NsMD: md,
RunOpts: runOpts,
RuntimeOpts: runtimeOpts,
FsOpts: s.fsopts,
ShardTimeRanges: shardsTimeRanges,
ReaderPool: readerPool,
ReadersCh: readersCh,
BlockSize: blockSize,
// NB(bodu): We only read metadata when bootstrap index
// so we do not need to sort the data fileset reader.
DataReaderDoNotSort: run == bootstrapIndexRunType,
Logger: s.log,
Span: span,
NowFn: s.nowFn,
})
bootstrapFromDataReadersResult := s.bootstrapFromReaders(run, md,
accumulator, runOpts, readerPool, readersCh, builder)

Expand Down
33 changes: 24 additions & 9 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ import (
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"

"github.com/m3db/m3/src/x/instrument"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -144,8 +144,7 @@ func (s *peersSource) Read(

// NB(r): Perform all data bootstrapping first then index bootstrapping
// to more clearly deliniate which process is slower than the other.
nowFn := s.opts.ResultOptions().ClockOptions().NowFn()
start := nowFn()
start := s.nowFn()
s.log.Info("bootstrapping time series data start")
span.LogEvent("bootstrap_data_start")
for _, elem := range namespaces.Namespaces.Iter() {
Expand All @@ -166,7 +165,7 @@ func (s *peersSource) Read(
})
}
s.log.Info("bootstrapping time series data success",
zap.Duration("took", nowFn().Sub(start)))
zap.Duration("took", s.nowFn().Sub(start)))
span.LogEvent("bootstrap_data_done")

alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator()
Expand All @@ -176,7 +175,7 @@ func (s *peersSource) Read(
}
builder := result.NewIndexBuilder(segBuilder)

start = nowFn()
start = s.nowFn()
s.log.Info("bootstrapping index metadata start")
span.LogEvent("bootstrap_index_start")
for _, elem := range namespaces.Namespaces.Iter() {
Expand All @@ -193,7 +192,8 @@ func (s *peersSource) Read(
r, err := s.readIndex(md,
namespace.IndexRunOptions.ShardTimeRanges,
builder,
namespace.IndexRunOptions.RunOptions)
namespace.IndexRunOptions.RunOptions,
span)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand All @@ -210,7 +210,7 @@ func (s *peersSource) Read(
results.Results.Set(md.ID(), result)
}
s.log.Info("bootstrapping index metadata success",
zap.Duration("took", nowFn().Sub(start)))
zap.Duration("took", s.nowFn().Sub(start)))
span.LogEvent("bootstrap_index_done")

return results, nil
Expand Down Expand Up @@ -660,6 +660,7 @@ func (s *peersSource) readIndex(
shardsTimeRanges result.ShardTimeRanges,
builder *result.IndexBuilder,
opts bootstrap.RunOptions,
span opentracing.Span,
) (result.IndexBootstrapResult, error) {
if err := s.validateRunOpts(opts); err != nil {
return nil, err
Expand Down Expand Up @@ -691,8 +692,22 @@ func (s *peersSource) readIndex(
zap.Int("shards", count),
)

go bootstrapper.EnqueueReaders(ns, opts, runtimeOpts, fsOpts, shardsTimeRanges, readerPool,
readersCh, indexBlockSize, s.log)
go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{
NsMD: ns,
RunOpts: opts,
RuntimeOpts: runtimeOpts,
FsOpts: fsOpts,
ShardTimeRanges: shardsTimeRanges,
ReaderPool: readerPool,
ReadersCh: readersCh,
BlockSize: indexBlockSize,
// NB(bodu): We only read metadata when performing a peers bootstrap
// so we do not need to sort the data fileset reader.
DataReaderDoNotSort: true,
Logger: s.log,
Span: span,
NowFn: s.nowFn,
})

for timeWindowReaders := range readersCh {
// NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks,
Expand Down
Loading