Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only read the commit log once during bootstrapping #2645

Merged
merged 6 commits into from
Sep 17, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 97 additions & 51 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand All @@ -72,6 +73,11 @@ type commitLogSource struct {
newReaderFn newReaderFn

metrics commitLogSourceMetrics
// Cache the results of reading the commit log between passes. The commit log is not sharded by time range, so the
// entire log needs to be read irrespective of the configured time ranges for the pass. The commit log only needs
// to be read once (during the first pass) and the results can be subsequently cached and returned on future passes.
// Since the bootstrapper is single threaded this does not need to be guarded with a mutex.
commitLogResult commitLogResult
}

type bootstrapNamespace struct {
Expand Down Expand Up @@ -177,14 +183,10 @@ func (s *commitLogSource) Read(

var (
// Emit bootstrapping gauge for duration of ReadData.
doneReadingData = s.metrics.emitBootstrapping()
encounteredCorruptData = false
fsOpts = s.opts.CommitLogOptions().FilesystemOptions()
filePathPrefix = fsOpts.FilePathPrefix()
namespaceIter = namespaces.Namespaces.Iter()
namespaceResults = make(map[string]*readNamespaceResult, len(namespaceIter))
setInitialTopologyState bool
initialTopologyState *topology.StateSnapshot
doneReadingData = s.metrics.emitBootstrapping()
fsOpts = s.opts.CommitLogOptions().FilesystemOptions()
filePathPrefix = fsOpts.FilePathPrefix()
namespaceIter = namespaces.Namespaces.Iter()
)
defer doneReadingData()

Expand All @@ -206,17 +208,6 @@ func (s *commitLogSource) Read(
shardTimeRanges.AddRanges(ns.IndexRunOptions.TargetShardTimeRanges)
}

namespaceResults[ns.Metadata.ID().String()] = &readNamespaceResult{
namespace: ns,
dataAndIndexShardRanges: shardTimeRanges,
}

// Make the initial topology state available.
if !setInitialTopologyState {
setInitialTopologyState = true
initialTopologyState = ns.DataRunOptions.RunOptions.InitialTopologyState()
}

// Determine which snapshot files are available.
snapshotFilesByShard, err := s.snapshotFilesByShard(
ns.Metadata.ID(), filePathPrefix, shardTimeRanges)
Expand Down Expand Up @@ -246,10 +237,56 @@ func (s *commitLogSource) Read(
zap.Duration("took", s.nowFn().Sub(startSnapshotsRead)))
span.LogEvent("read_snapshots_done")

commitLogResult, err := s.readCommitLog(namespaces, span)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately the diff is not great. I essentially moved all of the commit log reading logic into a new method readCommitLog. additionally I changed the for loop below to iterate over the provided namespaces and not the namespace results from the commit log. didn't seem necessary to use the results in the for loop and it allowed the results to be private in the new method.

if err != nil {
return bootstrap.NamespaceResults{}, err
}

bootstrapResult := bootstrap.NamespaceResults{
Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}),
}
for _, elem := range namespaceIter {
ns := elem.Value()
id := ns.Metadata.ID()
dataResult := result.NewDataBootstrapResult()
if commitLogResult.shouldReturnUnfulfilled {
shardTimeRanges := ns.DataRunOptions.ShardTimeRanges
dataResult = shardTimeRanges.ToUnfulfilledDataResult()
}
var indexResult result.IndexBootstrapResult
if ns.Metadata.Options().IndexOptions().Enabled() {
indexResult = result.NewIndexBootstrapResult()
if commitLogResult.shouldReturnUnfulfilled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could move up the nesting of this logic so we don't need to perform the same check twice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we can, since you're already inside a conditional branch for IndexOptions

shardTimeRanges := ns.IndexRunOptions.ShardTimeRanges
indexResult = shardTimeRanges.ToUnfulfilledIndexResult()
}
}
bootstrapResult.Results.Set(id, bootstrap.NamespaceResult{
Metadata: ns.Metadata,
Shards: ns.Shards,
DataResult: dataResult,
IndexResult: indexResult,
})
}

return bootstrapResult, nil
}

type commitLogResult struct {
shouldReturnUnfulfilled bool
read bool
}

func (s *commitLogSource) readCommitLog(namespaces bootstrap.Namespaces, span opentracing.Span) (commitLogResult, error) {
if s.commitLogResult.read {
s.log.Debug("commit log already read in a previous pass. skipping and returning previous result. ")
return s.commitLogResult, nil
}

// Setup the series accumulator pipeline.
var (
numWorkers = s.opts.AccumulateConcurrency()
workers = make([]*accumulateWorker, 0, numWorkers)
numWorkers = s.opts.AccumulateConcurrency()
workers = make([]*accumulateWorker, 0, numWorkers)
)
for i := 0; i < numWorkers; i++ {
worker := &accumulateWorker{
Expand All @@ -270,6 +307,37 @@ func (s *commitLogSource) Read(
// NB(r): Ensure that channels always get closed.
defer closeWorkerChannels()

var(
namespaceIter = namespaces.Namespaces.Iter()
namespaceResults = make(map[string]*readNamespaceResult, len(namespaceIter))
setInitialTopologyState bool
initialTopologyState *topology.StateSnapshot
)
for _, elem := range namespaceIter {
ns := elem.Value()

// NB(r): Combine all shard time ranges across data and index
// so we can do in one go.
shardTimeRanges := result.NewShardTimeRanges()
// NB(bodu): Use TargetShardTimeRanges which covers the entire original target shard range
// since the commitlog bootstrapper should run for the entire bootstrappable range per shard.
shardTimeRanges.AddRanges(ns.DataRunOptions.TargetShardTimeRanges)
if ns.Metadata.Options().IndexOptions().Enabled() {
shardTimeRanges.AddRanges(ns.IndexRunOptions.TargetShardTimeRanges)
}

namespaceResults[ns.Metadata.ID().String()] = &readNamespaceResult{
namespace: ns,
dataAndIndexShardRanges: shardTimeRanges,
}

// Make the initial topology state available.
if !setInitialTopologyState {
setInitialTopologyState = true
initialTopologyState = ns.DataRunOptions.RunOptions.InitialTopologyState()
}
}

// Setup the commit log iterator.
var (
iterOpts = commitlog.IteratorOpts{
Expand All @@ -285,6 +353,7 @@ func (s *commitLogSource) Read(
datapointsSkippedNotBootstrappingShard = 0
datapointsSkippedShardNoLongerOwned = 0
startCommitLogsRead = s.nowFn()
encounteredCorruptData = false
)
s.log.Info("read commit logs start")
span.LogEvent("read_commitlogs_start")
Expand All @@ -305,7 +374,7 @@ func (s *commitLogSource) Read(
iter, corruptFiles, err := s.newIteratorFn(iterOpts)
if err != nil {
err = fmt.Errorf("unable to create commit log iterator: %v", err)
return bootstrap.NamespaceResults{}, err
return commitLogResult{}, err
}

if len(corruptFiles) > 0 {
Expand Down Expand Up @@ -446,7 +515,7 @@ func (s *commitLogSource) Read(
commitLogSeries[seriesKey] = seriesMapEntry{shardNoLongerOwned: true}
continue
}
return bootstrap.NamespaceResults{}, err
return commitLogResult{}, err
}

seriesEntry = seriesMapEntry{
Expand Down Expand Up @@ -518,36 +587,13 @@ func (s *commitLogSource) Read(
shouldReturnUnfulfilled, err := s.shouldReturnUnfulfilled(
workers, encounteredCorruptData, initialTopologyState)
if err != nil {
return bootstrap.NamespaceResults{}, err
return commitLogResult{}, err
}

bootstrapResult := bootstrap.NamespaceResults{
Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}),
}
for _, ns := range namespaceResults {
id := ns.namespace.Metadata.ID()
dataResult := result.NewDataBootstrapResult()
if shouldReturnUnfulfilled {
shardTimeRanges := ns.namespace.DataRunOptions.ShardTimeRanges
dataResult = shardTimeRanges.ToUnfulfilledDataResult()
}
var indexResult result.IndexBootstrapResult
if ns.namespace.Metadata.Options().IndexOptions().Enabled() {
indexResult = result.NewIndexBootstrapResult()
if shouldReturnUnfulfilled {
shardTimeRanges := ns.namespace.IndexRunOptions.ShardTimeRanges
indexResult = shardTimeRanges.ToUnfulfilledIndexResult()
}
}
bootstrapResult.Results.Set(id, bootstrap.NamespaceResult{
Metadata: ns.namespace.Metadata,
Shards: ns.namespace.Shards,
DataResult: dataResult,
IndexResult: indexResult,
})
s.commitLogResult = commitLogResult{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's a little awkward to set it here and use the set value as the return value. We also use the return value after the function call. Maybe it's better to either have no return or return the value and let the caller do the setting?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I don't have a strong opinion about this since it's all private local state anyways. I'll just update it so the main function controls the caching logic and decides if it should actually call readCommitLog

shouldReturnUnfulfilled: shouldReturnUnfulfilled,
read: true,
}

return bootstrapResult, nil
return s.commitLogResult, nil
}

func (s *commitLogSource) snapshotFilesByShard(
Expand Down