Skip to content

Commit

Permalink
Improve code structure and comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul committed Sep 27, 2018
1 parent c7ca7b5 commit ef9e684
Showing 1 changed file with 40 additions and 35 deletions.
75 changes: 40 additions & 35 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,44 +485,49 @@ func (s *peersSource) flush(
}
}

if seriesCachePolicy == series.CacheAll ||
// We only want to retain the series metadata in one of three cases:
// 1) CacheAll caching policy (because we're expected to cache everything in memory)
// 2) CacheAllMetadata caching policy (because we're expected to cache all metadata in memory)
// 3) PersistConfig.FileSetType is set to FileSetSnapshotType because that means we're bootstrapping
// an active block that we'll want to perform a flush on later, and we're only flushing here for
// the sake of allowing the commit log bootstrapper to be able to recover this data if the node
// goes down in-between this bootstrapper completing and the subsequent flush.
shouldRetainSeriesMetadata := seriesCachePolicy == series.CacheAll ||
seriesCachePolicy == series.CacheAllMetadata ||
persistConfig.FileSetType == persist.FileSetSnapshotType {
// In all three of these cases we still need to keep all of the metadata in memory
// so we can return early.
return nil
}
persistConfig.FileSetType == persist.FileSetSnapshotType

if !shouldRetainSeriesMetadata {
// If we're not going to keep all of the data, or at least all of the metadata in memory
// then we don't want to keep these series in the shard result. If we leave them in, then
// they will all get loaded into the shard object, and then immediately evicted on the next
// tick which causes unnecessary memory pressure.
numSeriesTriedToRemoveWithRemainingBlocks := 0
for _, entry := range shardResult.AllSeries().Iter() {
series := entry.Value()
numBlocksRemaining := len(series.Blocks.AllBlocks())
// Should never happen since we removed all the block in the previous loop and fetching
// bootstrap blocks should always be exclusive on the end side.
if numBlocksRemaining > 0 {
numSeriesTriedToRemoveWithRemainingBlocks++
continue
}

// If we're not going to keep all of the data, or at least all of the metadata in memory
// then we don't want to keep these series in the shard result. If we leave them in, then
// they will all get loaded into the shard object, and then immediately evicted on the next
// tick which causes unnecessary memory pressure.
numSeriesTriedToRemoveWithRemainingBlocks := 0
for _, entry := range shardResult.AllSeries().Iter() {
series := entry.Value()
numBlocksRemaining := len(series.Blocks.AllBlocks())
// Should never happen since we removed all the block in the previous loop and fetching
// bootstrap blocks should always be exclusive on the end side.
if numBlocksRemaining > 0 {
numSeriesTriedToRemoveWithRemainingBlocks++
continue
shardResult.RemoveSeries(series.ID)
series.Blocks.Close()
// Safe to finalize these IDs and Tags because the prepared object was the only other thing
// using them, and it has been closed.
series.ID.Finalize()
series.Tags.Finalize()
}
if numSeriesTriedToRemoveWithRemainingBlocks > 0 {
iOpts := s.opts.ResultOptions().InstrumentOptions()
instrument.EmitInvariantViolationAndGetLogger(iOpts).
WithFields(
xlog.NewField("start", tr.Start.Unix()),
xlog.NewField("end", tr.End.Unix()),
xlog.NewField("numTimes", numSeriesTriedToRemoveWithRemainingBlocks),
).Error("error tried to remove series that still has blocks")
}

shardResult.RemoveSeries(series.ID)
series.Blocks.Close()
// Safe to finalize these IDs and Tags because the prepared object was the only other thing
// using them, and it has been closed.
series.ID.Finalize()
series.Tags.Finalize()
}
if numSeriesTriedToRemoveWithRemainingBlocks > 0 {
iOpts := s.opts.ResultOptions().InstrumentOptions()
instrument.EmitInvariantViolationAndGetLogger(iOpts).
WithFields(
xlog.NewField("start", tr.Start.Unix()),
xlog.NewField("end", tr.End.Unix()),
xlog.NewField("numTimes", numSeriesTriedToRemoveWithRemainingBlocks),
).Error("error tried to remove series that still has blocks")
}

return nil
Expand Down

0 comments on commit ef9e684

Please sign in to comment.