diff --git a/src/dbnode/integration/commitlog_bootstrap_helpers.go b/src/dbnode/integration/commitlog_bootstrap_helpers.go index 0bc98de968..e9456ff1e2 100644 --- a/src/dbnode/integration/commitlog_bootstrap_helpers.go +++ b/src/dbnode/integration/commitlog_bootstrap_helpers.go @@ -106,8 +106,8 @@ func writeCommitLogData( data generate.SeriesBlocksByStart, namespace namespace.Metadata, genSnapshots bool, -) { - writeCommitLogDataBase(t, s, opts, data, namespace, nil, nil) +) int { + return writeCommitLogDataBase(t, s, opts, data, namespace, nil, nil) } func writeCommitLogDataSpecifiedTS( @@ -118,8 +118,8 @@ func writeCommitLogDataSpecifiedTS( namespace namespace.Metadata, ts time.Time, genSnapshots bool, -) { - writeCommitLogDataBase(t, s, opts, data, namespace, &ts, nil) +) int { + return writeCommitLogDataBase(t, s, opts, data, namespace, &ts, nil) } func writeCommitLogDataWithPredicate( @@ -129,10 +129,11 @@ func writeCommitLogDataWithPredicate( data generate.SeriesBlocksByStart, namespace namespace.Metadata, pred generate.WriteDatapointPredicate, -) { - writeCommitLogDataBase(t, s, opts, data, namespace, nil, pred) +) int { + return writeCommitLogDataBase(t, s, opts, data, namespace, nil, pred) } +// returns the number of data points written to the commit log func writeCommitLogDataBase( t *testing.T, s TestSetup, @@ -141,7 +142,7 @@ func writeCommitLogDataBase( namespace namespace.Metadata, specifiedTS *time.Time, pred generate.WriteDatapointPredicate, -) { +) int { if pred == nil { pred = generate.WriteAllPredicate } @@ -155,6 +156,7 @@ func writeCommitLogDataBase( shardSet = s.ShardSet() tagEncoderPool = opts.FilesystemOptions().TagEncoderPool() tagSliceIter = ident.NewTagsIterator(ident.Tags{}) + writes int ) // Write out commit log data. @@ -203,12 +205,14 @@ func writeCommitLogDataBase( } if pred(point.Value) { require.NoError(t, commitLog.Write(ctx, cID, point.Value.Datapoint, xtime.Second, point.Value.Annotation)) + writes++ } } // ensure writes finished. require.NoError(t, commitLog.Close()) } + return writes } func writeSnapshotsWithPredicate( diff --git a/src/dbnode/integration/commitlog_bootstrap_test.go b/src/dbnode/integration/commitlog_bootstrap_test.go index d6ab5e76be..8826584f6f 100644 --- a/src/dbnode/integration/commitlog_bootstrap_test.go +++ b/src/dbnode/integration/commitlog_bootstrap_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/retention" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) func TestCommitLogBootstrap(t *testing.T) { @@ -79,7 +80,7 @@ func testCommitLogBootstrap(t *testing.T, setTestOpts setTestOptions, updateInpu now := setup.NowFn()() seriesMaps := generateSeriesMaps(30, updateInputConfig, now.Add(-2*blockSize), now.Add(-blockSize)) log.Info("writing data") - writeCommitLogData(t, setup, commitLogOpts, seriesMaps, ns1, false) + writes := writeCommitLogData(t, setup, commitLogOpts, seriesMaps, ns1, false) log.Info("finished writing data") // Setup bootstrapper after writing data so filesystem inspection can find it. @@ -109,4 +110,6 @@ func testCommitLogBootstrap(t *testing.T, setTestOpts setTestOptions, updateInpu observedSeriesMaps2 := testSetupToSeriesMaps(t, setup, ns2, metadatasByShard2) verifySeriesMapsEqual(t, emptySeriesMaps, observedSeriesMaps2) + counters := commitLogOpts.InstrumentOptions().MetricsScope().(tally.TestScope).Snapshot().Counters() + require.Equal(t, writes, int(counters["bootstrapper-commitlog.commitlog.entries-read+"].Value())) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 91af1ba4a3..d89aca7d06 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -46,6 +46,7 @@ import ( "github.com/m3db/m3/src/x/pool" xtime "github.com/m3db/m3/src/x/time" + "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -72,6 +73,11 @@ type commitLogSource struct { newReaderFn newReaderFn metrics commitLogSourceMetrics + // Cache the results of reading the commit log between passes. The commit log is not sharded by time range, so the + // entire log needs to be read irrespective of the configured time ranges for the pass. The commit log only needs + // to be read once (during the first pass) and the results can be subsequently cached and returned on future passes. + // Since the bootstrapper is single threaded this does not need to be guarded with a mutex. + commitLogResult commitLogResult } type bootstrapNamespace struct { @@ -177,14 +183,10 @@ func (s *commitLogSource) Read( var ( // Emit bootstrapping gauge for duration of ReadData. - doneReadingData = s.metrics.emitBootstrapping() - encounteredCorruptData = false - fsOpts = s.opts.CommitLogOptions().FilesystemOptions() - filePathPrefix = fsOpts.FilePathPrefix() - namespaceIter = namespaces.Namespaces.Iter() - namespaceResults = make(map[string]*readNamespaceResult, len(namespaceIter)) - setInitialTopologyState bool - initialTopologyState *topology.StateSnapshot + doneReadingData = s.metrics.emitBootstrapping() + fsOpts = s.opts.CommitLogOptions().FilesystemOptions() + filePathPrefix = fsOpts.FilePathPrefix() + namespaceIter = namespaces.Namespaces.Iter() ) defer doneReadingData() @@ -206,17 +208,6 @@ func (s *commitLogSource) Read( shardTimeRanges.AddRanges(ns.IndexRunOptions.TargetShardTimeRanges) } - namespaceResults[ns.Metadata.ID().String()] = &readNamespaceResult{ - namespace: ns, - dataAndIndexShardRanges: shardTimeRanges, - } - - // Make the initial topology state available. - if !setInitialTopologyState { - setInitialTopologyState = true - initialTopologyState = ns.DataRunOptions.RunOptions.InitialTopologyState() - } - // Determine which snapshot files are available. snapshotFilesByShard, err := s.snapshotFilesByShard( ns.Metadata.ID(), filePathPrefix, shardTimeRanges) @@ -246,6 +237,53 @@ func (s *commitLogSource) Read( zap.Duration("took", s.nowFn().Sub(startSnapshotsRead))) span.LogEvent("read_snapshots_done") + if !s.commitLogResult.read { + var err error + s.commitLogResult, err = s.readCommitLog(namespaces, span) + if err != nil { + return bootstrap.NamespaceResults{}, err + } + } else { + s.log.Debug("commit log already read in a previous pass, using previous result.") + } + + bootstrapResult := bootstrap.NamespaceResults{ + Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}), + } + for _, elem := range namespaceIter { + ns := elem.Value() + id := ns.Metadata.ID() + dataResult := result.NewDataBootstrapResult() + if s.commitLogResult.shouldReturnUnfulfilled { + shardTimeRanges := ns.DataRunOptions.ShardTimeRanges + dataResult = shardTimeRanges.ToUnfulfilledDataResult() + } + var indexResult result.IndexBootstrapResult + if ns.Metadata.Options().IndexOptions().Enabled() { + indexResult = result.NewIndexBootstrapResult() + if s.commitLogResult.shouldReturnUnfulfilled { + shardTimeRanges := ns.IndexRunOptions.ShardTimeRanges + indexResult = shardTimeRanges.ToUnfulfilledIndexResult() + } + } + bootstrapResult.Results.Set(id, bootstrap.NamespaceResult{ + Metadata: ns.Metadata, + Shards: ns.Shards, + DataResult: dataResult, + IndexResult: indexResult, + }) + } + + return bootstrapResult, nil +} + +type commitLogResult struct { + shouldReturnUnfulfilled bool + // ensures we only read the commit log once + read bool +} + +func (s *commitLogSource) readCommitLog(namespaces bootstrap.Namespaces, span opentracing.Span) (commitLogResult, error) { // Setup the series accumulator pipeline. var ( numWorkers = s.opts.AccumulateConcurrency() @@ -270,6 +308,37 @@ func (s *commitLogSource) Read( // NB(r): Ensure that channels always get closed. defer closeWorkerChannels() + var ( + namespaceIter = namespaces.Namespaces.Iter() + namespaceResults = make(map[string]*readNamespaceResult, len(namespaceIter)) + setInitialTopologyState bool + initialTopologyState *topology.StateSnapshot + ) + for _, elem := range namespaceIter { + ns := elem.Value() + + // NB(r): Combine all shard time ranges across data and index + // so we can do in one go. + shardTimeRanges := result.NewShardTimeRanges() + // NB(bodu): Use TargetShardTimeRanges which covers the entire original target shard range + // since the commitlog bootstrapper should run for the entire bootstrappable range per shard. + shardTimeRanges.AddRanges(ns.DataRunOptions.TargetShardTimeRanges) + if ns.Metadata.Options().IndexOptions().Enabled() { + shardTimeRanges.AddRanges(ns.IndexRunOptions.TargetShardTimeRanges) + } + + namespaceResults[ns.Metadata.ID().String()] = &readNamespaceResult{ + namespace: ns, + dataAndIndexShardRanges: shardTimeRanges, + } + + // Make the initial topology state available. + if !setInitialTopologyState { + setInitialTopologyState = true + initialTopologyState = ns.DataRunOptions.RunOptions.InitialTopologyState() + } + } + // Setup the commit log iterator. var ( iterOpts = commitlog.IteratorOpts{ @@ -285,6 +354,7 @@ func (s *commitLogSource) Read( datapointsSkippedNotBootstrappingShard = 0 datapointsSkippedShardNoLongerOwned = 0 startCommitLogsRead = s.nowFn() + encounteredCorruptData = false ) s.log.Info("read commit logs start") span.LogEvent("read_commitlogs_start") @@ -305,7 +375,7 @@ func (s *commitLogSource) Read( iter, corruptFiles, err := s.newIteratorFn(iterOpts) if err != nil { err = fmt.Errorf("unable to create commit log iterator: %v", err) - return bootstrap.NamespaceResults{}, err + return commitLogResult{}, err } if len(corruptFiles) > 0 { @@ -349,6 +419,7 @@ func (s *commitLogSource) Read( // to read. var lastFileReadID uint64 for iter.Next() { + s.metrics.commitLogEntriesRead.Inc(1) entry := iter.Current() currFileReadID := entry.Metadata.FileReadID @@ -446,7 +517,7 @@ func (s *commitLogSource) Read( commitLogSeries[seriesKey] = seriesMapEntry{shardNoLongerOwned: true} continue } - return bootstrap.NamespaceResults{}, err + return commitLogResult{}, err } seriesEntry = seriesMapEntry{ @@ -518,36 +589,9 @@ func (s *commitLogSource) Read( shouldReturnUnfulfilled, err := s.shouldReturnUnfulfilled( workers, encounteredCorruptData, initialTopologyState) if err != nil { - return bootstrap.NamespaceResults{}, err - } - - bootstrapResult := bootstrap.NamespaceResults{ - Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}), + return commitLogResult{}, err } - for _, ns := range namespaceResults { - id := ns.namespace.Metadata.ID() - dataResult := result.NewDataBootstrapResult() - if shouldReturnUnfulfilled { - shardTimeRanges := ns.namespace.DataRunOptions.ShardTimeRanges - dataResult = shardTimeRanges.ToUnfulfilledDataResult() - } - var indexResult result.IndexBootstrapResult - if ns.namespace.Metadata.Options().IndexOptions().Enabled() { - indexResult = result.NewIndexBootstrapResult() - if shouldReturnUnfulfilled { - shardTimeRanges := ns.namespace.IndexRunOptions.ShardTimeRanges - indexResult = shardTimeRanges.ToUnfulfilledIndexResult() - } - } - bootstrapResult.Results.Set(id, bootstrap.NamespaceResult{ - Metadata: ns.namespace.Metadata, - Shards: ns.namespace.Shards, - DataResult: dataResult, - IndexResult: indexResult, - }) - } - - return bootstrapResult, nil + return commitLogResult{shouldReturnUnfulfilled: shouldReturnUnfulfilled, read: true}, nil } func (s *commitLogSource) snapshotFilesByShard( @@ -1069,12 +1113,14 @@ func (s *commitLogSource) shardsReplicated( type commitLogSourceMetrics struct { corruptCommitlogFile tally.Counter bootstrapping tally.Gauge + commitLogEntriesRead tally.Counter } func newCommitLogSourceMetrics(scope tally.Scope) commitLogSourceMetrics { return commitLogSourceMetrics{ corruptCommitlogFile: scope.SubScope("commitlog").Counter("corrupt"), bootstrapping: scope.SubScope("status").Gauge("bootstrapping"), + commitLogEntriesRead: scope.SubScope("commitlog").Counter("entries-read"), } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index b51069f090..0147f0d620 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -87,6 +87,59 @@ func TestReadEmpty(t *testing.T) { tester.EnsureNoWrites() } +func TestReadOnlyOnce(t *testing.T) { + opts := testDefaultOpts + md := testNsMetadata(t) + nsCtx := namespace.NewContextFrom(md) + src := newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource) + + blockSize := md.Options().RetentionOptions().BlockSize() + now := time.Now() + start := now.Truncate(blockSize).Add(-blockSize) + end := now.Truncate(blockSize) + + ranges := xtime.NewRanges(xtime.Range{Start: start, End: end}) + + foo := ts.Series{Namespace: nsCtx.ID, Shard: 0, ID: ident.StringID("foo")} + bar := ts.Series{Namespace: nsCtx.ID, Shard: 1, ID: ident.StringID("bar")} + baz := ts.Series{Namespace: nsCtx.ID, Shard: 2, ID: ident.StringID("baz")} + + values := testValues{ + {foo, start, 1.0, xtime.Second, nil}, + {foo, start.Add(1 * time.Minute), 2.0, xtime.Second, nil}, + {bar, start.Add(2 * time.Minute), 1.0, xtime.Second, nil}, + {bar, start.Add(3 * time.Minute), 2.0, xtime.Second, nil}, + // "baz" is in shard 2 and should not be returned + {baz, start.Add(4 * time.Minute), 1.0, xtime.Second, nil}, + } + + var commitLogReads int + src.newIteratorFn = func( + _ commitlog.IteratorOpts, + ) (commitlog.Iterator, []commitlog.ErrorWithPath, error) { + commitLogReads++ + return newTestCommitLogIterator(values, nil), nil, nil + } + + targetRanges := result.NewShardTimeRanges().Set(0, ranges).Set(1, ranges) + tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md) + defer tester.Finish() + + // simulate 2 passes over the commit log + for i := 0; i < 2; i++ { + tester.TestReadWith(src) + tester.TestUnfulfilledForNamespaceIsEmpty(md) + + read := tester.EnsureDumpWritesForNamespace(md) + require.Equal(t, 2, len(read)) + enforceValuesAreCorrect(t, values[:4], read) + tester.EnsureNoLoadedBlocks() + } + + // commit log should only be iterated over once. + require.Equal(t, 1, commitLogReads) +} + func TestReadErrorOnNewIteratorError(t *testing.T) { opts := testDefaultOpts src := newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource)