Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Do not unnecessarily sort index entries in bootstrap paths #2533

Merged
merged 4 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var (

// errReadNotExpectedSize returned when the size of the next read does not match size specified by the index
errReadNotExpectedSize = errors.New("next read not expected size")

// errReadMetadataOptimizedForRead returned when we optimized for only reading metadata but are attempting a regular read
errReadMetadataOptimizedForRead = errors.New("read metadata optimized for regular read")
)

const (
Expand Down Expand Up @@ -99,6 +102,10 @@ type reader struct {
shard uint32
volume int
open bool
// NB(bodu): Informs whether or not we optimize for only reading
// metadata. We don't need to sort for reading metadata but sorting is
// required if we are performing regulars reads.
optimizedReadMetadataOnly bool
}

// NewReader returns a new reader and expects all files to exist. Will read the
Expand Down Expand Up @@ -271,6 +278,7 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
r.open = true
r.namespace = namespace
r.shard = shard
r.optimizedReadMetadataOnly = opts.OptimizedReadMetadataOnly

return nil
}
Expand Down Expand Up @@ -337,13 +345,20 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
}
r.indexEntriesByOffsetAsc = append(r.indexEntriesByOffsetAsc, entry)
}
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))
// This is false by default so we always sort unless otherwise specified.
if !r.optimizedReadMetadataOnly {
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data. This is only required for regular reads.
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))
}
return nil
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
// NB(bodu): We cannot perform regular reads if we're optimizing for only reading metadata.
if r.optimizedReadMetadataOnly {
return nil, nil, nil, 0, errReadMetadataOptimizedForRead
}
if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries {
// Have not read the index yet, this is required when reading
// data as we need each index entry in order by by the offset ascending
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ type DataFileSetReaderStatus struct {
type DataReaderOpenOptions struct {
Identifier FileSetFileIdentifier
FileSetType persist.FileSetType
// NB(bodu): This option can inform the reader to optimize for reading
// only metadata by not sorting index entries. Setting this option will
// throw an error if a regular `Read()` is attempted.
OptimizedReadMetadataOnly bool
}

// DataFileSetReader provides an unsynchronized reader for a TSDB file set
Expand Down
85 changes: 56 additions & 29 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
Expand All @@ -48,6 +49,8 @@ import (
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -69,6 +72,7 @@ type fileSystemSource struct {
opts Options
fsopts fs.Options
log *zap.Logger
nowFn clock.NowFn
idPool ident.Pool
newReaderFn newDataFileSetReaderFn
newReaderPoolOpts bootstrapper.NewReaderPoolOptions
Expand Down Expand Up @@ -96,6 +100,7 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) {
opts: opts,
fsopts: opts.FilesystemOptions(),
log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")),
nowFn: opts.ResultOptions().ClockOptions().NowFn(),
idPool: opts.IdentifierPool(),
newReaderFn: fs.NewReader,
persistManager: &bootstrapper.SharedPersistManager{
Expand All @@ -116,18 +121,18 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) {

func (s *fileSystemSource) AvailableData(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) (result.ShardTimeRanges, error) {
return s.availability(md, shardsTimeRanges)
return s.availability(md, shardTimeRanges)
}

func (s *fileSystemSource) AvailableIndex(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) (result.ShardTimeRanges, error) {
return s.availability(md, shardsTimeRanges)
return s.availability(md, shardTimeRanges)
}

func (s *fileSystemSource) Read(
Expand All @@ -150,8 +155,7 @@ func (s *fileSystemSource) Read(

// NB(r): Perform all data bootstrapping first then index bootstrapping
// to more clearly deliniate which process is slower than the other.
nowFn := s.opts.ResultOptions().ClockOptions().NowFn()
start := nowFn()
start := s.nowFn()
dataLogFields := []zapcore.Field{
zap.Stringer("cachePolicy", s.opts.ResultOptions().SeriesCachePolicy()),
}
Expand All @@ -164,7 +168,7 @@ func (s *fileSystemSource) Read(

r, err := s.read(bootstrapDataRunType, md, namespace.DataAccumulator,
namespace.DataRunOptions.ShardTimeRanges,
namespace.DataRunOptions.RunOptions, builder)
namespace.DataRunOptions.RunOptions, builder, span)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand All @@ -176,10 +180,10 @@ func (s *fileSystemSource) Read(
})
}
s.log.Info("bootstrapping time series data success",
append(dataLogFields, zap.Duration("took", nowFn().Sub(start)))...)
append(dataLogFields, zap.Duration("took", s.nowFn().Sub(start)))...)
span.LogEvent("bootstrap_data_done")

start = nowFn()
start = s.nowFn()
s.log.Info("bootstrapping index metadata start")
span.LogEvent("bootstrap_index_start")
for _, elem := range namespaces.Namespaces.Iter() {
Expand All @@ -194,7 +198,7 @@ func (s *fileSystemSource) Read(

r, err := s.read(bootstrapIndexRunType, md, namespace.DataAccumulator,
namespace.IndexRunOptions.ShardTimeRanges,
namespace.IndexRunOptions.RunOptions, builder)
namespace.IndexRunOptions.RunOptions, builder, span)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand All @@ -210,18 +214,18 @@ func (s *fileSystemSource) Read(
results.Results.Set(md.ID(), result)
}
s.log.Info("bootstrapping index metadata success",
zap.Duration("took", nowFn().Sub(start)))
zap.Duration("took", s.nowFn().Sub(start)))
span.LogEvent("bootstrap_index_done")

return results, nil
}

func (s *fileSystemSource) availability(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
) (result.ShardTimeRanges, error) {
result := result.NewShardTimeRangesFromSize(shardsTimeRanges.Len())
for shard, ranges := range shardsTimeRanges.Iter() {
result := result.NewShardTimeRangesFromSize(shardTimeRanges.Len())
for shard, ranges := range shardTimeRanges.Iter() {
result.Set(shard, s.shardAvailability(md.ID(), shard, ranges))
}
return result, nil
Expand Down Expand Up @@ -459,9 +463,8 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
var (
indexBlockSize = ns.Options().IndexOptions().BlockSize()
retentionPeriod = ns.Options().RetentionOptions().RetentionPeriod()
nowFn = s.opts.ResultOptions().ClockOptions().NowFn()
beginningOfIndexRetention = retention.FlushTimeStartForRetentionPeriod(
retentionPeriod, indexBlockSize, nowFn())
retentionPeriod, indexBlockSize, s.nowFn())
initialIndexRange = xtime.Range{
Start: beginningOfIndexRetention,
End: beginningOfIndexRetention.Add(indexBlockSize),
Expand Down Expand Up @@ -674,15 +677,16 @@ func (s *fileSystemSource) read(
run runType,
md namespace.Metadata,
accumulator bootstrap.NamespaceDataAccumulator,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
builder *result.IndexBuilder,
span opentracing.Span,
) (*runResult, error) {
var (
seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy()
res *runResult
)
if shardsTimeRanges.IsEmpty() {
if shardTimeRanges.IsEmpty() {
return newRunResult(), nil
}

Expand All @@ -701,25 +705,34 @@ func (s *fileSystemSource) read(
if seriesCachePolicy != series.CacheAll {
// Unless we're caching all series (or all series metadata) in memory, we
// return just the availability of the files we have.
return s.bootstrapDataRunResultFromAvailability(md, shardsTimeRanges), nil
return s.bootstrapDataRunResultFromAvailability(md, shardTimeRanges), nil
}
}

logSpan := func(event string) {
span.LogFields(
opentracinglog.String("event", event),
opentracinglog.String("nsID", md.ID().String()),
opentracinglog.String("shardTimeRanges", shardTimeRanges.SummaryString()),
)
}
if run == bootstrapIndexRunType {
logSpan("bootstrap_from_index_persisted_blocks_start")
// NB(r): First read all the FSTs and add to runResult index results,
// subtract the shard + time ranges from what we intend to bootstrap
// for those we found.
r, err := s.bootstrapFromIndexPersistedBlocks(md,
shardsTimeRanges)
shardTimeRanges)
if err != nil {
s.log.Warn("filesystem bootstrapped failed to read persisted index blocks")
} else {
// We may have less we need to read
shardsTimeRanges = shardsTimeRanges.Copy()
shardsTimeRanges.Subtract(r.fulfilled)
shardTimeRanges = shardTimeRanges.Copy()
shardTimeRanges.Subtract(r.fulfilled)
// Set or merge result.
setOrMergeResult(r.result)
}
logSpan("bootstrap_from_index_persisted_blocks_done")
}

// Create a reader pool once per bootstrap as we don't really want to
Expand All @@ -737,8 +750,22 @@ func (s *fileSystemSource) read(
panic(fmt.Errorf("unrecognized run type: %d", run))
}
runtimeOpts := s.opts.RuntimeOptionsManager().Get()
go bootstrapper.EnqueueReaders(md, runOpts, runtimeOpts, s.fsopts, shardsTimeRanges,
readerPool, readersCh, blockSize, s.log)
go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{
NsMD: md,
RunOpts: runOpts,
RuntimeOpts: runtimeOpts,
FsOpts: s.fsopts,
ShardTimeRanges: shardTimeRanges,
ReaderPool: readerPool,
ReadersCh: readersCh,
BlockSize: blockSize,
// NB(bodu): We only read metadata when bootstrap index
// so we do not need to sort the data fileset reader.
OptimizedReadMetadataOnly: run == bootstrapIndexRunType,
Logger: s.log,
Span: span,
NowFn: s.nowFn,
})
bootstrapFromDataReadersResult := s.bootstrapFromReaders(run, md,
accumulator, runOpts, readerPool, readersCh, builder)

Expand All @@ -755,11 +782,11 @@ func (s *fileSystemSource) newReader() (fs.DataFileSetReader, error) {

func (s *fileSystemSource) bootstrapDataRunResultFromAvailability(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
) *runResult {
runResult := newRunResult()
unfulfilled := runResult.data.Unfulfilled()
for shard, ranges := range shardsTimeRanges.Iter() {
for shard, ranges := range shardTimeRanges.Iter() {
if ranges.IsEmpty() {
continue
}
Expand All @@ -784,7 +811,7 @@ type bootstrapFromIndexPersistedBlocksResult struct {

func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
) (bootstrapFromIndexPersistedBlocksResult, error) {
res := bootstrapFromIndexPersistedBlocksResult{
fulfilled: result.NewShardTimeRanges(),
Expand All @@ -799,7 +826,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
s.log.Error("unable to read index info file",
zap.Stringer("namespace", ns.ID()),
zap.Error(err),
zap.Stringer("shardsTimeRanges", shardsTimeRanges),
zap.Stringer("shardTimeRanges", shardTimeRanges),
zap.String("filepath", infoFile.Err.Filepath()),
)
continue
Expand All @@ -813,7 +840,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
}
willFulfill := result.NewShardTimeRanges()
for _, shard := range info.Shards {
tr, ok := shardsTimeRanges.Get(shard)
tr, ok := shardTimeRanges.Get(shard)
if !ok {
// No ranges match for this shard.
continue
Expand Down
Loading