diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 9805f06170..68bef48ef8 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -50,6 +50,9 @@ var ( // errReadNotExpectedSize returned when the size of the next read does not match size specified by the index errReadNotExpectedSize = errors.New("next read not expected size") + + // errReadMetadataOptimizedForRead returned when we optimized for only reading metadata but are attempting a regular read + errReadMetadataOptimizedForRead = errors.New("read metadata optimized for regular read") ) const ( @@ -99,6 +102,10 @@ type reader struct { shard uint32 volume int open bool + // NB(bodu): Informs whether or not we optimize for only reading + // metadata. We don't need to sort for reading metadata but sorting is + // required if we are performing regulars reads. + optimizedReadMetadataOnly bool } // NewReader returns a new reader and expects all files to exist. Will read the @@ -271,6 +278,7 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { r.open = true r.namespace = namespace r.shard = shard + r.optimizedReadMetadataOnly = opts.OptimizedReadMetadataOnly return nil } @@ -337,13 +345,20 @@ func (r *reader) readIndexAndSortByOffsetAsc() error { } r.indexEntriesByOffsetAsc = append(r.indexEntriesByOffsetAsc, entry) } - // NB(r): As we decode each block we need access to each index entry - // in the order we decode the data - sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc)) + // This is false by default so we always sort unless otherwise specified. + if !r.optimizedReadMetadataOnly { + // NB(r): As we decode each block we need access to each index entry + // in the order we decode the data. This is only required for regular reads. + sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc)) + } return nil } func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + // NB(bodu): We cannot perform regular reads if we're optimizing for only reading metadata. + if r.optimizedReadMetadataOnly { + return nil, nil, nil, 0, errReadMetadataOptimizedForRead + } if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries { // Have not read the index yet, this is required when reading // data as we need each index entry in order by by the offset ascending diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 7c15b6d56e..dd6d255cb1 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -122,6 +122,10 @@ type DataFileSetReaderStatus struct { type DataReaderOpenOptions struct { Identifier FileSetFileIdentifier FileSetType persist.FileSetType + // NB(bodu): This option can inform the reader to optimize for reading + // only metadata by not sorting index entries. Setting this option will + // throw an error if a regular `Read()` is attempted. + OptimizedReadMetadataOnly bool } // DataFileSetReader provides an unsynchronized reader for a TSDB file set diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index aee996bb5d..2a804462d1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" @@ -48,6 +49,8 @@ import ( "github.com/m3db/m3/src/x/pool" xtime "github.com/m3db/m3/src/x/time" + "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -69,6 +72,7 @@ type fileSystemSource struct { opts Options fsopts fs.Options log *zap.Logger + nowFn clock.NowFn idPool ident.Pool newReaderFn newDataFileSetReaderFn newReaderPoolOpts bootstrapper.NewReaderPoolOptions @@ -96,6 +100,7 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) { opts: opts, fsopts: opts.FilesystemOptions(), log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")), + nowFn: opts.ResultOptions().ClockOptions().NowFn(), idPool: opts.IdentifierPool(), newReaderFn: fs.NewReader, persistManager: &bootstrapper.SharedPersistManager{ @@ -116,18 +121,18 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) { func (s *fileSystemSource) AvailableData( md namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { - return s.availability(md, shardsTimeRanges) + return s.availability(md, shardTimeRanges) } func (s *fileSystemSource) AvailableIndex( md namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { - return s.availability(md, shardsTimeRanges) + return s.availability(md, shardTimeRanges) } func (s *fileSystemSource) Read( @@ -150,8 +155,7 @@ func (s *fileSystemSource) Read( // NB(r): Perform all data bootstrapping first then index bootstrapping // to more clearly deliniate which process is slower than the other. - nowFn := s.opts.ResultOptions().ClockOptions().NowFn() - start := nowFn() + start := s.nowFn() dataLogFields := []zapcore.Field{ zap.Stringer("cachePolicy", s.opts.ResultOptions().SeriesCachePolicy()), } @@ -164,7 +168,7 @@ func (s *fileSystemSource) Read( r, err := s.read(bootstrapDataRunType, md, namespace.DataAccumulator, namespace.DataRunOptions.ShardTimeRanges, - namespace.DataRunOptions.RunOptions, builder) + namespace.DataRunOptions.RunOptions, builder, span) if err != nil { return bootstrap.NamespaceResults{}, err } @@ -176,10 +180,10 @@ func (s *fileSystemSource) Read( }) } s.log.Info("bootstrapping time series data success", - append(dataLogFields, zap.Duration("took", nowFn().Sub(start)))...) + append(dataLogFields, zap.Duration("took", s.nowFn().Sub(start)))...) span.LogEvent("bootstrap_data_done") - start = nowFn() + start = s.nowFn() s.log.Info("bootstrapping index metadata start") span.LogEvent("bootstrap_index_start") for _, elem := range namespaces.Namespaces.Iter() { @@ -194,7 +198,7 @@ func (s *fileSystemSource) Read( r, err := s.read(bootstrapIndexRunType, md, namespace.DataAccumulator, namespace.IndexRunOptions.ShardTimeRanges, - namespace.IndexRunOptions.RunOptions, builder) + namespace.IndexRunOptions.RunOptions, builder, span) if err != nil { return bootstrap.NamespaceResults{}, err } @@ -210,7 +214,7 @@ func (s *fileSystemSource) Read( results.Results.Set(md.ID(), result) } s.log.Info("bootstrapping index metadata success", - zap.Duration("took", nowFn().Sub(start))) + zap.Duration("took", s.nowFn().Sub(start))) span.LogEvent("bootstrap_index_done") return results, nil @@ -218,10 +222,10 @@ func (s *fileSystemSource) Read( func (s *fileSystemSource) availability( md namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, ) (result.ShardTimeRanges, error) { - result := result.NewShardTimeRangesFromSize(shardsTimeRanges.Len()) - for shard, ranges := range shardsTimeRanges.Iter() { + result := result.NewShardTimeRangesFromSize(shardTimeRanges.Len()) + for shard, ranges := range shardTimeRanges.Iter() { result.Set(shard, s.shardAvailability(md.ID(), shard, ranges)) } return result, nil @@ -459,9 +463,8 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( var ( indexBlockSize = ns.Options().IndexOptions().BlockSize() retentionPeriod = ns.Options().RetentionOptions().RetentionPeriod() - nowFn = s.opts.ResultOptions().ClockOptions().NowFn() beginningOfIndexRetention = retention.FlushTimeStartForRetentionPeriod( - retentionPeriod, indexBlockSize, nowFn()) + retentionPeriod, indexBlockSize, s.nowFn()) initialIndexRange = xtime.Range{ Start: beginningOfIndexRetention, End: beginningOfIndexRetention.Add(indexBlockSize), @@ -674,15 +677,16 @@ func (s *fileSystemSource) read( run runType, md namespace.Metadata, accumulator bootstrap.NamespaceDataAccumulator, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, builder *result.IndexBuilder, + span opentracing.Span, ) (*runResult, error) { var ( seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() res *runResult ) - if shardsTimeRanges.IsEmpty() { + if shardTimeRanges.IsEmpty() { return newRunResult(), nil } @@ -701,25 +705,34 @@ func (s *fileSystemSource) read( if seriesCachePolicy != series.CacheAll { // Unless we're caching all series (or all series metadata) in memory, we // return just the availability of the files we have. - return s.bootstrapDataRunResultFromAvailability(md, shardsTimeRanges), nil + return s.bootstrapDataRunResultFromAvailability(md, shardTimeRanges), nil } } + logSpan := func(event string) { + span.LogFields( + opentracinglog.String("event", event), + opentracinglog.String("nsID", md.ID().String()), + opentracinglog.String("shardTimeRanges", shardTimeRanges.SummaryString()), + ) + } if run == bootstrapIndexRunType { + logSpan("bootstrap_from_index_persisted_blocks_start") // NB(r): First read all the FSTs and add to runResult index results, // subtract the shard + time ranges from what we intend to bootstrap // for those we found. r, err := s.bootstrapFromIndexPersistedBlocks(md, - shardsTimeRanges) + shardTimeRanges) if err != nil { s.log.Warn("filesystem bootstrapped failed to read persisted index blocks") } else { // We may have less we need to read - shardsTimeRanges = shardsTimeRanges.Copy() - shardsTimeRanges.Subtract(r.fulfilled) + shardTimeRanges = shardTimeRanges.Copy() + shardTimeRanges.Subtract(r.fulfilled) // Set or merge result. setOrMergeResult(r.result) } + logSpan("bootstrap_from_index_persisted_blocks_done") } // Create a reader pool once per bootstrap as we don't really want to @@ -737,8 +750,22 @@ func (s *fileSystemSource) read( panic(fmt.Errorf("unrecognized run type: %d", run)) } runtimeOpts := s.opts.RuntimeOptionsManager().Get() - go bootstrapper.EnqueueReaders(md, runOpts, runtimeOpts, s.fsopts, shardsTimeRanges, - readerPool, readersCh, blockSize, s.log) + go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{ + NsMD: md, + RunOpts: runOpts, + RuntimeOpts: runtimeOpts, + FsOpts: s.fsopts, + ShardTimeRanges: shardTimeRanges, + ReaderPool: readerPool, + ReadersCh: readersCh, + BlockSize: blockSize, + // NB(bodu): We only read metadata when bootstrap index + // so we do not need to sort the data fileset reader. + OptimizedReadMetadataOnly: run == bootstrapIndexRunType, + Logger: s.log, + Span: span, + NowFn: s.nowFn, + }) bootstrapFromDataReadersResult := s.bootstrapFromReaders(run, md, accumulator, runOpts, readerPool, readersCh, builder) @@ -755,11 +782,11 @@ func (s *fileSystemSource) newReader() (fs.DataFileSetReader, error) { func (s *fileSystemSource) bootstrapDataRunResultFromAvailability( md namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, ) *runResult { runResult := newRunResult() unfulfilled := runResult.data.Unfulfilled() - for shard, ranges := range shardsTimeRanges.Iter() { + for shard, ranges := range shardTimeRanges.Iter() { if ranges.IsEmpty() { continue } @@ -784,7 +811,7 @@ type bootstrapFromIndexPersistedBlocksResult struct { func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( ns namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, ) (bootstrapFromIndexPersistedBlocksResult, error) { res := bootstrapFromIndexPersistedBlocksResult{ fulfilled: result.NewShardTimeRanges(), @@ -799,7 +826,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( s.log.Error("unable to read index info file", zap.Stringer("namespace", ns.ID()), zap.Error(err), - zap.Stringer("shardsTimeRanges", shardsTimeRanges), + zap.Stringer("shardTimeRanges", shardTimeRanges), zap.String("filepath", infoFile.Err.Filepath()), ) continue @@ -813,7 +840,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( } willFulfill := result.NewShardTimeRanges() for _, shard := range info.Shards { - tr, ok := shardsTimeRanges.Get(shard) + tr, ok := shardTimeRanges.Get(shard) if !ok { // No ranges match for this shard. continue diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 410b7a6220..585c4903ba 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -44,11 +44,11 @@ import ( idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" - "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" + "github.com/opentracing/opentracing-go" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -94,24 +94,24 @@ type shardPeerAvailability struct { func (s *peersSource) AvailableData( nsMetadata namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { if err := s.validateRunOpts(runOpts); err != nil { return nil, err } - return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) + return s.peerAvailability(nsMetadata, shardTimeRanges, runOpts) } func (s *peersSource) AvailableIndex( nsMetadata namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { if err := s.validateRunOpts(runOpts); err != nil { return nil, err } - return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) + return s.peerAvailability(nsMetadata, shardTimeRanges, runOpts) } func (s *peersSource) Read( @@ -144,8 +144,7 @@ func (s *peersSource) Read( // NB(r): Perform all data bootstrapping first then index bootstrapping // to more clearly deliniate which process is slower than the other. - nowFn := s.opts.ResultOptions().ClockOptions().NowFn() - start := nowFn() + start := s.nowFn() s.log.Info("bootstrapping time series data start") span.LogEvent("bootstrap_data_start") for _, elem := range namespaces.Namespaces.Iter() { @@ -166,7 +165,7 @@ func (s *peersSource) Read( }) } s.log.Info("bootstrapping time series data success", - zap.Duration("took", nowFn().Sub(start))) + zap.Duration("took", s.nowFn().Sub(start))) span.LogEvent("bootstrap_data_done") alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator() @@ -176,7 +175,7 @@ func (s *peersSource) Read( } builder := result.NewIndexBuilder(segBuilder) - start = nowFn() + start = s.nowFn() s.log.Info("bootstrapping index metadata start") span.LogEvent("bootstrap_index_start") for _, elem := range namespaces.Namespaces.Iter() { @@ -193,7 +192,8 @@ func (s *peersSource) Read( r, err := s.readIndex(md, namespace.IndexRunOptions.ShardTimeRanges, builder, - namespace.IndexRunOptions.RunOptions) + namespace.IndexRunOptions.RunOptions, + span) if err != nil { return bootstrap.NamespaceResults{}, err } @@ -210,7 +210,7 @@ func (s *peersSource) Read( results.Results.Set(md.ID(), result) } s.log.Info("bootstrapping index metadata success", - zap.Duration("took", nowFn().Sub(start))) + zap.Duration("took", s.nowFn().Sub(start))) span.LogEvent("bootstrap_index_done") return results, nil @@ -219,14 +219,14 @@ func (s *peersSource) Read( func (s *peersSource) readData( nsMetadata namespace.Metadata, accumulator bootstrap.NamespaceDataAccumulator, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, opts bootstrap.RunOptions, ) (result.DataBootstrapResult, error) { if err := s.validateRunOpts(opts); err != nil { return nil, err } - if shardsTimeRanges.IsEmpty() { + if shardTimeRanges.IsEmpty() { return result.NewDataBootstrapResult(), nil } @@ -266,7 +266,7 @@ func (s *peersSource) readData( session, err := s.opts.AdminClient().DefaultAdminSession() if err != nil { s.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) - result.SetUnfulfilled(shardsTimeRanges) + result.SetUnfulfilled(shardTimeRanges) return nil, err } @@ -277,7 +277,7 @@ func (s *peersSource) readData( persistenceMaxQueueSize = s.opts.PersistenceMaxQueueSize() persistenceQueue = make(chan persistenceFlush, persistenceMaxQueueSize) resultOpts = s.opts.ResultOptions() - count = shardsTimeRanges.Len() + count = shardTimeRanges.Len() concurrency = s.opts.DefaultShardConcurrency() blockSize = nsMetadata.Options().RetentionOptions().BlockSize() ) @@ -296,7 +296,7 @@ func (s *peersSource) readData( workers := xsync.NewWorkerPool(concurrency) workers.Init() - for shard, ranges := range shardsTimeRanges.Iter() { + for shard, ranges := range shardTimeRanges.Iter() { shard, ranges := shard, ranges wg.Add(1) workers.Go(func() { @@ -657,9 +657,10 @@ func (s *peersSource) flush( func (s *peersSource) readIndex( ns namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, builder *result.IndexBuilder, opts bootstrap.RunOptions, + span opentracing.Span, ) (result.IndexBootstrapResult, error) { if err := s.validateRunOpts(opts); err != nil { return nil, err @@ -668,12 +669,12 @@ func (s *peersSource) readIndex( // FOLLOWUP(r): Try to reuse any metadata fetched during the ReadData(...) // call rather than going to the network again r := result.NewIndexBootstrapResult() - if shardsTimeRanges.IsEmpty() { + if shardTimeRanges.IsEmpty() { return r, nil } var ( - count = shardsTimeRanges.Len() + count = shardTimeRanges.Len() indexBlockSize = ns.Options().IndexOptions().BlockSize() runtimeOpts = s.opts.RuntimeOptionsManager().Get() fsOpts = s.opts.FilesystemOptions() @@ -691,8 +692,22 @@ func (s *peersSource) readIndex( zap.Int("shards", count), ) - go bootstrapper.EnqueueReaders(ns, opts, runtimeOpts, fsOpts, shardsTimeRanges, readerPool, - readersCh, indexBlockSize, s.log) + go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{ + NsMD: ns, + RunOpts: opts, + RuntimeOpts: runtimeOpts, + FsOpts: fsOpts, + ShardTimeRanges: shardTimeRanges, + ReaderPool: readerPool, + ReadersCh: readersCh, + BlockSize: indexBlockSize, + // NB(bodu): We only read metadata when performing a peers bootstrap + // so we do not need to sort the data fileset reader. + OptimizedReadMetadataOnly: true, + Logger: s.log, + Span: span, + NowFn: s.nowFn, + }) for timeWindowReaders := range readersCh { // NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks, @@ -970,7 +985,7 @@ func (s *peersSource) readBlockMetadataAndIndex( func (s *peersSource) peerAvailability( nsMetadata namespace.Metadata, - shardsTimeRanges result.ShardTimeRanges, + shardTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { var ( @@ -978,7 +993,7 @@ func (s *peersSource) peerAvailability( initialTopologyState = runOpts.InitialTopologyState() ) - for shardIDUint := range shardsTimeRanges.Iter() { + for shardIDUint := range shardTimeRanges.Iter() { shardID := topology.ShardID(shardIDUint) shardPeers, ok := peerAvailabilityByShard[shardID] if !ok { @@ -1025,7 +1040,7 @@ func (s *peersSource) peerAvailability( majorityReplicas = initialTopologyState.MajorityReplicas availableShardTimeRanges = result.NewShardTimeRanges() ) - for shardIDUint := range shardsTimeRanges.Iter() { + for shardIDUint := range shardTimeRanges.Iter() { var ( shardID = topology.ShardID(shardIDUint) shardPeers = peerAvailabilityByShard[shardID] @@ -1057,7 +1072,7 @@ func (s *peersSource) peerAvailability( // all the data. This assumption is safe, as the shard/block ranges // will simply be marked unfulfilled if the peers are not able to // satisfy the requests. - if tr, ok := shardsTimeRanges.Get(shardIDUint); ok { + if tr, ok := shardTimeRanges.Get(shardIDUint); ok { availableShardTimeRanges.Set(shardIDUint, tr) } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/readers.go b/src/dbnode/storage/bootstrap/bootstrapper/readers.go index 16e93c61a2..7259ae4cb0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/readers.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/readers.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" @@ -32,7 +33,10 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" xtime "github.com/m3db/m3/src/x/time" + "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // TimeWindowReaders are grouped by data block. @@ -59,24 +63,41 @@ func newTimeWindowReaders( } } +// EnqueueReadersOptions supplies options to enqueue readers. +type EnqueueReadersOptions struct { + NsMD namespace.Metadata + RunOpts bootstrap.RunOptions + RuntimeOpts runtime.Options + FsOpts fs.Options + ShardTimeRanges result.ShardTimeRanges + ReaderPool *ReaderPool + ReadersCh chan<- TimeWindowReaders + BlockSize time.Duration + OptimizedReadMetadataOnly bool + Logger *zap.Logger + Span opentracing.Span + NowFn clock.NowFn +} + // EnqueueReaders into a readers channel grouped by data block. -func EnqueueReaders( - ns namespace.Metadata, - runOpts bootstrap.RunOptions, - runtimeOpts runtime.Options, - fsOpts fs.Options, - shardsTimeRanges result.ShardTimeRanges, - readerPool *ReaderPool, - readersCh chan<- TimeWindowReaders, - blockSize time.Duration, - logger *zap.Logger, -) { +func EnqueueReaders(opts EnqueueReadersOptions) { // Close the readers ch if and only if all readers are enqueued. - defer close(readersCh) + defer close(opts.ReadersCh) // Normal run, open readers - enqueueReadersGroupedByBlockSize(ns, runOpts, fsOpts, - shardsTimeRanges, readerPool, readersCh, blockSize, logger) + enqueueReadersGroupedByBlockSize( + opts.NsMD, + opts.RunOpts, + opts.FsOpts, + opts.ShardTimeRanges, + opts.ReaderPool, + opts.ReadersCh, + opts.BlockSize, + opts.OptimizedReadMetadataOnly, + opts.Logger, + opts.Span, + opts.NowFn, + ) } func enqueueReadersGroupedByBlockSize( @@ -87,7 +108,10 @@ func enqueueReadersGroupedByBlockSize( readerPool *ReaderPool, readersCh chan<- TimeWindowReaders, blockSize time.Duration, + optimizedReadMetadataOnly bool, logger *zap.Logger, + span opentracing.Span, + nowFn clock.NowFn, ) { // Group them by block size. groupFn := NewShardTimeRangesTimeWindowGroups @@ -97,7 +121,8 @@ func enqueueReadersGroupedByBlockSize( for _, group := range groupedByBlockSize { readers := make(map[ShardID]ShardReaders, group.Ranges.Len()) for shard, tr := range group.Ranges.Iter() { - shardReaders := newShardReaders(ns, fsOpts, readerPool, shard, tr, logger) + shardReaders := newShardReaders(ns, fsOpts, readerPool, shard, tr, + optimizedReadMetadataOnly, logger, span, nowFn) readers[ShardID(shard)] = shardReaders } readersCh <- newTimeWindowReaders(group.Ranges, readers) @@ -110,15 +135,40 @@ func newShardReaders( readerPool *ReaderPool, shard uint32, tr xtime.Ranges, + optimizedReadMetadataOnly bool, logger *zap.Logger, + span opentracing.Span, + nowFn clock.NowFn, ) ShardReaders { + logSpan := func(event string) { + span.LogFields( + opentracinglog.String("event", event), + opentracinglog.Uint32("shard", shard), + opentracinglog.String("tr", tr.String()), + ) + } + logFields := []zapcore.Field{ + zap.Uint32("shard", shard), + zap.String("tr", tr.String()), + } + + start := nowFn() + logger.Debug("enqueue readers read info files start", logFields...) + logSpan("enqueue_readers_read_info_files_start") readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard, fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType) + logger.Debug("enqueue readers read info files done", + append(logFields, zap.Duration("took", nowFn().Sub(start)))...) + logSpan("enqueue_readers_read_info_files_done") + if len(readInfoFilesResults) == 0 { // No readers. return ShardReaders{} } + start = nowFn() + logger.Debug("enqueue readers open data readers start", logFields...) + logSpan("enqueue_readers_open_data_readers_start") readers := make([]fs.DataFileSetReader, 0, len(readInfoFilesResults)) for i := 0; i < len(readInfoFilesResults); i++ { result := readInfoFilesResults[i] @@ -160,6 +210,7 @@ func newShardReaders( Shard: shard, BlockStart: blockStart, }, + OptimizedReadMetadataOnly: optimizedReadMetadataOnly, } if err := r.Open(openOpts); err != nil { logger.Error("unable to open fileset files", @@ -175,6 +226,9 @@ func newShardReaders( readers = append(readers, r) } + logger.Debug("enqueue readers open data readers done", + append(logFields, zap.Duration("took", nowFn().Sub(start)))...) + logSpan("enqueue_readers_open_data_readers_done") return ShardReaders{Readers: readers} }